在具有不同数据类型的生锈假人数据集上的PySpark ML中进行模型开发

您是否已经知道如何在PySpark ML中使用多种数据类型?没有?然后,您急需访问我们。



图片



你好!我想详细介绍一个有趣的,但不幸的是,不是Spark文档中的主题:如何在PySpark ML中的具有不同数据类型(字符串和数字)的数据集上训练模型?之所以写这篇文章,是因为需要在Internet上浏览几天以找到包含代码的必要文章,因为Spark的官方教程提供了一个示例,该示例不仅使用一种数据类型的符号,而且通常使用一种符号,还提供了有关如何使用的信息。几列数据类型更多,没有。但是,在详细研究了PySpark处理数据的功能之后,我设法编写了工作代码并了解了一切如何发生,并希望与您分享。如此全速前进,朋友们!



首先,让我们导入工作所需的所有库,然后我们将详细分析代码,以使任何自重的“生锈茶壶”(顺便说一句,我最近)都将了解所有内容:



#  
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#     
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator


现在,让我们创建一个(本地)Spark上下文和一个Spark会话,并通过在屏幕上显示它来检查一切是否正常。创建Spark会话是在Spark中使用数据集的起点:



#  
sc = SparkContext('local')
spark = SparkSession(sc)
spark






有一个用于处理数据的工具,现在让我们加载它。本文使用的数据集来自Kaggle机器学习竞赛网站:

https

//www.kaggle.com/unitednations/international-greenhouse-gas-emissions,下载后以.csv格式存储在path_csv中,并具有以下选项:



  • 标头:如果在我们的文件中第一行是标头,则设置为“ true”
  • 定界符:我们放置一个符号,用符号将一行的数据分隔开,通常是“,”或“;”。
  • inferSchema:如果为true,则PySpark将自动检测每列的类型,否则您必须自己编写


#   .csv  path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)


为了更好地了解我们正在处理哪种数据,让我们看一下其中的几行:



#   
data.show()




我们还要看看数据集中有多少行:

#  
data.select('year').count()






最后,让我们推断数据的类型,正如我们记得的那样,我们要求PySpark使用选项(“ inferSchema”,“ true”)自动确定:



#     
data.printSchema()






现在,让我们继续学习主要课程-处理不同数据类型的几种迹象。Spark可以在转换后的数据上训练模型,其中预测列是向量,而具有特征的列也是向量,这使任务复杂化。但是我们不放弃,并且在PySpark中训练模型,我们将使用Pipeline,我们将在其中传递一定的行动计划(可变阶段):



  1. 步骤label_stringIdx:我们将要预测的值数据集的列转换为Spark向量字符串,并将其重命名为带有参数handleInvalid ='keep'的标签,这意味着我们的预测列支持null
  2. stringIndexer步骤:将字符串列转换为Spark分类字符串
  3. encoder: ()
  4. assembler: Spark, , VectorAssembler(), ( ) (assemblerInputs) «features»
  5. gbt: PySpark ML GBTRegressor,


#value -      - 
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#   :    
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #        
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#   : 
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#    - - 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]


让我们将数据集分别以70%到30%的喜欢比率分为训练样本和测试样本,然后开始使用梯度回归增强树(GBTRegressor)训练模型,该树应基于先前组合为一个“特征”向量的特征来预测“标签”向量迭代限制为maxIter = 10:



#       (30% )
(trainingData, testData) = data.randomSplit([0.7, 0.3])

#  (   )
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

#   stages    
pipeline = Pipeline(stages=stages)


现在我们只需要向计算机发送一个行动计划和一个训练数据集:



#  
model = pipeline.fit(trainingData)

#     
predictions = model.transform(testData)


让我们保存模型,以便我们随时可以重新使用它而无需重新培训:



# 
pipeline.write().overwrite().save('model/gbtregr_model')


如果您决定再次使用训练有素的模型进行预测,则只需编写:



#     
load_model = pipeline.read().load('model/gbtregr_model')




因此,我们已经看到了使用Python语言处理大数据的工具PySpark如何实现与不同数据类型的多个功能列一起使用的方法。



现在是时候将其应用于您的模型了...



All Articles