Apache Spark™3.0中用于结构化流的新UI概述

本文的翻译是在“数据工程师”课程开始前夕准备的










结构化流首先在Apache Spark 2.0中引入。该平台已成为构建分布式流应用程序的最佳选择。 SQL / Dataset / DataFrame API和内置的Spark函数的统一使开发人员更容易实现其复杂的基本需求,例如流聚合,流连接和窗口支持。自从结构化流发布以来,就像我们在Spark Streaming(如DStream)中所做的那样,开发人员普遍要求改进流控制。在Apache Spark 3.0中,我们发布了用于结构化流的新UI。



新的UI结构化流提供了一种简便的方法,可通过可操作的见解和统计信息来监视所有流作业,从而使调试过程中的问题排查更加容易,并通过实时指标提高了生产可见性。UI提供了两组统计信息:1)关于流查询作业的聚合信息,以及2)关于流请求的详细统计信息,包括输入速率,处理速率,输入行,批处理持续时间,操作持续时间等。



有关流查询作业的汇总信息



当开发人员提交流SQL查询时,它会显示在“结构化流”选项卡中,其中包括活动的流查询和已完成的查询。结果表将提供有关流请求的一些基本信息,包括请求名称,状态,ID,运行ID,发送时间,请求持续时间,最后一个数据包ID,以及汇总信息,例如平均接收率和平均处理率。流式请求状态有三种类型:“运行中”,“完成”和“失败”。在完成的流式传输请求表中列出了所有已完成和失败的请求。 “错误”列显示失败的请求异常的详细信息。







我们可以通过单击“运行ID”链接来查看有关流式传输请求的详细统计信息。



详细的统计信息



“统计信息”页面显示指标,包括摄取/处理速率,延迟和详细的操作持续时间,这些指标对于了解流式请求的状态非常有用,从而可以轻松调试请求处理中的异常。









它包含以下指标:



  • 输入速率:汇总(所有来源)的数据到达速率。
  • 处理速率:Spark处理数据的总速率(跨所有来源)。
  • 批次持续时间:每个批次的持续时间。
  • 操作持续时间:执行各种操作所花费的时间(以毫秒为单位)。


监视的事务如下:



  • addBatch:从源读取微型批次的输入数据,对其进行处理并写入要同步的批次的输出数据所花费的时间。这通常会占用微型批处理的大部分时间。
  • getBatch:准备逻辑请求以从源读取当前微包装的输入数据所花费的时间。
  • getOffset:花费时间询问来源是否有新输入。
  • walCommit:将偏移量写入元数据日志。
  • queryPlanning:创建执行计划。


应该注意的是,并不是所有列出的操作都将显示在UI中。不同类型的数据源有不同的操作,因此可以在一个流请求中执行某些列出的操作。



使用UI对流性能进行故障排除



在本节中,我们将研究几种情况,其中新的UI结构化流表明异常情况正在发生。一个高级演示请求如下所示,在每种情况下,我们都将假设一些先决条件:



import java.util.UUID

val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString

val lines = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topics)
    .load()
    .selectExpr("CAST(value AS STRING)")
    .as[String]

val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()

val query = wordCounts.writeStream
    .outputMode("complete")
    .format("console")
    .option("checkpointLocation", checkpointLocation)
    .start()


由于处理能力不足而导致延迟增加



在第一种情况下,我们运行一个请求以尽快处理Apache Kafka数据。对于每个批次,流作业都会处理Kafka中的所有可用数据。如果处理能力不足以处理分组数据,则等待时间将迅速增加。最直观的判断是输入行和批处理持续时间将线性增长。Input Rows参数指定流作业每秒最多可处理8000个写入。但是当前的输入速率约为每秒20,000条记录。我们可以为线程作业提供更多运行资源,或者可以添加足够的分区来处理与生产者保持同步所需的所有消费者。







稳定但高延迟



这种情况与前一种情况有何不同?延迟不会增加,但保持稳定,如以下屏幕快照所示:







我们发现处理速率可以在相同的输入速率下保持稳定。这意味着作业的处理能力足以处理输入数据。但是,每批的处理时间,即延迟,仍为20秒。高延迟的主要原因是每个批次中的数据过多。通常,我们可以通过增加这项工作的并行度来减少等待时间。在为Spark任务添加了10个以上的Kafka分区和10个内核之后,我们发现等待时间约为5秒-远远好于20秒。







使用操作持续时间图进行故障排除



“操作持续时间”图表显示执行各种操作所花费的时间(以毫秒为单位)。这对于了解每个批次的时间并简化故障排除很有用。让我们以Apache Spark社区中的性能改进工作“ SPARK-30915:在寻找最新的批次ID时避免读取元数据日志文件”为例。

在进行此改进之前,压缩后的元数据日志变得巨大时,压缩后的每个后续批处理都比其他批处理花费更多的时间。







检查代码后,发现并修复了不必要的读取压缩日志文件的问题。以下操作持续时间图确认了预期的效果:







对未来的计划



如上所示,新的UI结构化流将通过具有有关流请求的更多有用信息来帮助开发人员更好地控制其流工作。作为早期版本,新的UI仍在开发中,并将在以后的版本中进行改进。在不久的将来可能会实现一些功能,包括但不限于以下功能:



  • 了解有关流查询执行的更多信息:最新数据,水印,数据状态指标等。
  • Spark History Server上的结构化Streaming UI支持。
  • 有关异常行为的更多明显线索:延迟等。


尝试新的用户界面



在新的Databricks Runtime 7.1中的Apache Spark 3.0中尝试这个新的Spark Streaming UI。如果您使用的是Databricks笔记本,这也将为您提供一种简便的方法来观察笔记本中任何流请求的状态并管理您的请求您可以注册一个免费的Databricks帐户,并在几分钟内免费开始使用,而无需任何信用信息。






DWH中的数据质量是数据仓库的一致性。免费网络研讨会。






推荐阅读:



数据构建工具,或数据仓库和冰沙

在Delta Lake Dive中常见的功能使用Apache Arrow在Python中进行模式实施和演进

高速Apache Parquet



All Articles