无法使用Spark Continuous Streaming处理数据



我正在开发一个实时流媒体应用程序,该应用程序从Kafka代理中轮询数据,我正在调整以前默认使用Spark结构化流媒体的代码(使用微批处理(。然而,我不知道如何使用连续流而不是微批处理流来获得类似的行为。这是一段有效的代码:

query = df.writeStream 
.foreachBatch(foreach_batch_func) 
.start()

这就是我迄今为止尝试的连续流媒体:

query = df 
.writeStream 
.foreach(example_func) 
.trigger(continuous = '1 second') 
.start()

应用程序弹出以下错误:

连续执行不支持在org.apache.spark.sql.execution.streaming.continuous.ContinuousDataSourceRDD.compute(ContinuousDataSourceRDD.scala:76)上重试任务

我使用的是Spark(pyspark(3.0.1 w/Scala 2.12,Kafka 2.6.0

当我提交应用程序时,我正在添加jarorg.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1

Spark Structured Streaming中的连续处理模式仅适用于某些查询类型。

根据关于连续处理的文档,支持以下查询,但似乎不支持您的查询:

从Spark 2.4开始,在连续处理模式下只支持以下类型的查询。

Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (select, map, flatMap, mapPartitions, etc.) and selections (where, filter, etc.).
All SQL functions are supported except aggregation functions (since aggregations are not yet supported), current_timestamp() and current_date() (deterministic computations using time is challenging).
Sources:
Kafka source: All options are supported.
Rate source: Good for testing. Only options that are supported in the continuous mode are numPartitions and rowsPerSecond.
Sinks:
Kafka sink: All options are supported.
Memory sink: Good for debugging.
Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.

如果您看到此异常,则它不是作业失败的根本原因。您应该检查其他任务失败的原因。您可以尝试spark.task.maxFailures=1,这将抑制ContinuousTaskRetryException,更容易确定真正的根本原因。

仅当满足以下所有条件时才使用连续处理模式:

  1. 希望将端到端延迟严格控制在100ms以下
  2. 至少有一次处理是可以接受的,这意味着消息可能多次处理
  3. 当任何任务或执行器失败时,由于cp模式不支持任务重试,可以允许手动重新提交作业

我的建议是不要使用cp模式,除非你真的需要这样低的延迟,因为cp模式仍然是实验性的。更多详细信息,请参阅https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#continuous-处理

最新更新