Spark模式实践中的演变

亲爱的读者,大家好!



本文是Neoflex大数据解决方案业务线的首席顾问,详细介绍了使用Apache Spark构建可变结构店面的选项。



作为数据分析项目的一部分,通常会出现基于松散结构的数据构建集市的任务。



通常,这些是保存为JSON或XML的日志或来自各种系统的响应。将数据上传到Hadoop,然后您需要从中构建一个展示柜。我们可以组织对创建的店面的访问,例如通过Impala。



在这种情况下,目标店面的布局以前是未知的。而且,该图仍然依赖于数据,因此仍无法预先绘制,我们正在处理这些结构非常薄弱的​​数据。



例如,今天记录了以下答案:



{source: "app1", error_code: ""}


明天,以下答案来自同一系统:



{source: "app1", error_code: "error", description: "Network error"}


结果,应该在店面中添加另一个字段-描述,而没人知道它是否会出现。



在此类数据上创建集市的任务是相当标准的,Spark为此提供了许多工具。JSON和XML都支持解析原始数据,并且为以前未知的模式提供了schemaEvolution支持。



乍一看,解决方案看起来很简单。我们需要使用JSON提取文件夹并将其读入数据框。Spark将创建一个架构并将嵌套的数据转换为结构。然后,需要通过在Hive Metastore中注册展示柜,将所有内容保存在镶木地板中(Impala也支持)。



一切似乎都很简单。



但是,从文档中的简短示例尚不清楚在实践中如何处理许多问题。



该文档描述了一种不用于创建店面,而是用于将JSON或XML读入数据框的方法。



即,仅给出了如何读取和解析JSON的信息:



df = spark.read.json(path...)


这足以使数据可供Spark使用。



实际上,该方案比仅从文件夹读取JSON文件并创建数据框要复杂得多。情况看起来像这样:已经有一个展示柜,每天都有新数据出现,需要将它们添加到展示柜中,不要忘记方案可能有所不同。



构建店面的常用方案如下:



步骤1.将数据加载到Hadoop,然后每天重新加载并添加到新分区。原来,该文件夹具有按天划分的初始数据。



第2步。在初始化启动过程中,Spark将读取并解析此文件夹。生成的数据框以可用于分析的格式保存,例如以拼花形式保存,然后可以导入到Impala中。这将创建一个目标展示柜,其中包含到目前为止已累积的所有数据。



步骤3.创建一个下载,每天将更新店面。

问题产生于增加的负载,需要对陈列柜进行分区以及支持陈列柜的总体方案的问题。



让我们举个例子。假设已完成构建存储的第一步,并配置了将JSON文件导出到文件夹。



从它们创建数据框,然后将其另存为展示柜不是问题。这是您可以在Spark文档中轻松找到的第一步:



df = spark.read.option("mergeSchema", True).json(".../*") 
df.printSchema()

root 
|-- a: long (nullable = true) 
|-- b: string (nullable = true) 
|-- c: struct (nullable = true) |    
|-- d: long (nullable = true)


一切似乎都很好。



我们读取并解析了JSON,然后将数据帧另存为木地板,并以任何方便的方式向Hive注册:



df.write.format(“parquet”).option('path','<External Table Path>').saveAsTable('<Table Name>')


我们有一个展示柜。



但是,第二天,添加了来自源的新数据。我们有一个带有JSON的文件夹,以及一个基于此文件夹创建的展示柜。从源加载下一个数据块后,数据集市将用完一天的数据。



合理的解决方案是按天划分店面,这将允许每隔一天添加一个新的分区。此机制也是众所周知的,Spark允许您单独编写分区。



首先,我们进行初始化加载,如上所述保存数据,仅添加分区。此操作称为店面初始化,并且仅执行一次:



df.write.partitionBy("date_load").mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable)


第二天,我们仅加载一个新分区:



df.coalesce(1).write.mode("overwrite").parquet(dbpath + "/" + db + "/" + destTable +"/date_load=" + date_load + "/")


剩下的就是向Hive重新注册以更新架构。

但是,这是出现问题的地方。



第一个问题。迟早无法读取生成的拼花地板。这与镶木地板和JSON处理空字段的方式不同有关。



让我们考虑一个典型的情况。例如,JSON昨天到达:



 1: {"a": {"b": 1}},


今天,相同的JSON看起来像这样:



 2: {"a": null}


假设我们有两个不同的分区,每个分区一行。

当我们读取整个原始数据时,Spark将能够确定类型,并理解“ a”是“结构”类型的字段,而嵌套字段“ b”是INT类型。但是,如果每个分区分别保存,则会获得具有不兼容分区方案的镶木地板:



df1 (a: <struct<"b": INT>>)
df2 (a: STRING NULLABLE)


这种情况众所周知,因此,特别添加了一个选项-解析初始数据时,请删除空白字段:



df = spark.read.json("...", dropFieldIfAllNull=True)


在这种情况下,实木复合地板将由可以一起读取的分区组成。

尽管那些在实践中做到这一点的人会苦笑。为什么?因为可能还会出现另外两种情况。或三个。或四个。第一个几乎肯定会出现的是,数字类型在不同的JSON文件中看起来会有所不同。例如,{intField:1}和{intField:1.1}。如果在一个部分中找到了此类字段,则架构合并将正确读取所有内容,从而得出最准确的类型。但是如果不同,则其中一个将具有intField:int,另一个将具有intField:double。



有以下标志可以处理这种情况:



df = spark.read.json("...", dropFieldIfAllNull=True, primitivesAsString=True)


现在,我们有了一个分区所在的文件夹,可以将其读取到单个数据帧和整个店面的有效拼花地板中。是?没有。



请记住,我们在Hive中注册了该表。蜂巢在字段名称中不区分大小写,而拼花地板则区分大小写。因此,对于Hive而言,具有以下模式的分区:field1:int和Field1:int相同,但对于Spark则不同。记住要小写字段名称。



在那之后,一切似乎都很好。



但是,并非都那么简单。出现第二个也是众所周知的问题。由于每个新分区都是单独保存的,因此Spark服务文件将位于分区文件夹中,例如,_SUCCESS操作成功标志。尝试镶木地板时将引发错误。为避免这种情况,您需要通过禁止Spark将服务文件添加到文件夹来设置配置:



hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set("parquet.enable.summary-metadata", "false")
hadoopConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")


看来现在每天都有新的镶木地板分区被添加到目标店面文件夹中,该分区存储了当天的解析数据。我们预先确保没有分区存在数据类型冲突。



但是,摆在我们面前的是第三个问题。现在,在Hive中,具有错误方案的表现在是未知的通用方案,因为每个新分区很可能在方案中引入了失真。



您需要重新注册该表。这可以很简单地完成:再次读取店面镶木地板,获取架构并基于该架构创建DDL,使用该DDL将Hive中的文件夹重新注册为外部表,从而更新目标店面架构。



我们面临第四个问题。第一次注册该表时,我们依赖于Spark。现在我们自己做,您需要记住,拼花字段可以以对于Hive无效的字符开头。例如,Spark抛出了无法在“ corrupt_record”字段中解析的行。如果不进行转义,则无法向Hive注册该字段。



知道了这一点,我们得到了方案:



f_def = ""
for f in pf.dtypes:
  if f[0] != "date_load":
    f_def = f_def + "," + f[0].replace("_corrupt_record", "`_corrupt_record`") + " " + f[1].replace(":", "`:").replace("<", "<`").replace(",", ",`").replace("array<`", "array<") 
table_define = "CREATE EXTERNAL TABLE jsonevolvtable (" + f_def[1:] + " ) "
table_define = table_define + "PARTITIONED BY (date_load string) STORED AS PARQUET LOCATION '/user/admin/testJson/testSchemaEvolution/pq/'"
hc.sql("drop table if exists jsonevolvtable")
hc.sql(table_define)


代码(“ _corrupt_record”,“`_corrupt_record`”)+“ + + [1] .replace(”:“,”`:“)。Replace(” <“,” <`“)。Replace(”,“ ,“,`”)。replace(“ array <`”,“ array <”)使DDL安全,即代替:



create table tname (_field1 string, 1field string)


使用诸如“ _field1,1field”之类的字段名称,将在转义字段名称的地方创建一个安全的DDL:创建表“ tname”(“ _ field1”字符串,“ 1field”字符串)。



出现了一个问题:如何正确获取具有完整架构的数据框(在pf代码中)?我怎么得到这个pf?这是第五个问题。从具有目标店面的镶木地板文件的文件夹中重新读取所有分区的架构?这是最安全的方法,但最困难的一种。



该架构已在Hive中。您可以通过组合整个表和新分区的架构来获得新架构。因此,您需要从Hive中获取表架构,并将其与新的分区架构合并。这可以通过从Hive读取测试元数据,将其保存到临时文件夹并使用Spark一次读取两个分区来完成。



基本上,您拥有所需的一切:Hive中的原始表架构和新分区。我们也有数据。剩下的就是获得一个新的架构,该架构结合了店面架构和来自已创建分区的新字段:



from pyspark.sql import HiveContext
from pyspark.sql.functions import lit
hc = HiveContext(spark)
df = spark.read.json("...", dropFieldIfAllNull=True)
df.write.mode("overwrite").parquet(".../date_load=12-12-2019")
pe = hc.sql("select * from jsonevolvtable limit 1")
pe.write.mode("overwrite").parquet(".../fakePartiton/")
pf = spark.read.option("mergeSchema", True).parquet(".../date_load=12-12-2019/*", ".../fakePartiton/*")


接下来,如上一片段所示,我们将创建用于注册表的DDL。

如果整个链条都正常工作,即-进行了初始化加载,并且在Hive中创建了正确创建的表,那么我们将获得更新的表架构。



最后一个问题是,您不能仅将一个分区添加到Hive表中,因为它将被破坏。您需要强制Hive修复分区结构:



from pyspark.sql import HiveContext
hc = HiveContext(spark) 
hc.sql("MSCK REPAIR TABLE " + db + "." + destTable)


读取JSON并基于其创建店面的简单任务转化为克服了许多隐性难题,必须分别寻求解决方案。尽管这些解决方案很简单,但是要花很长时间才能找到。



为了实施展示柜的结构,我必须:



  • 将分区添加到店面,摆脱服务文件
  • 处理Spark键入的原始数据中的空字段
  • 将简单类型转换为字符串
  • 将字段名称转换为小写
  • Hive中单独的数据转储和表注册(DDL创建)
  • 请记住转义可能与Hive不兼容的字段名称
  • 学习在Hive中更新表的注册


总而言之,我们注意到建造展示柜的决定掩盖了许多陷阱。因此,如果在实施过程中遇到困难,最好联系具有成功专业知识的经验丰富的合作伙伴。



感谢您阅读本文,希望您发现有用的信息。



All Articles