仅在至少有n行时才执行流处理



我在kafka消费者处有以下SPARK SQL/流媒体查询,当批处理大小达到特定尺寸n时,我如何指定提取应为条件,否则消费者应缓冲处理之前的元素,因此,每当我要执行逻辑时,都可以保证我具有N大小为N的精确Dataset<VideoEventData>。当前代码:

Dataset<VideoEventData> ds = spark
      .readStream()
      .format("kafka")
      .option("kafka.bootstrap.servers", prop.getProperty("kafka.bootstrap.servers"))
      .option("subscribe", prop.getProperty("kafka.topic"))
      .option("kafka.max.partition.fetch.bytes", prop.getProperty("kafka.max.partition.fetch.bytes"))
      .option("kafka.max.poll.records", prop.getProperty("kafka.max.poll.records"))
      .load()
      .selectExpr("CAST(value AS STRING) as message")
      .select(functions.from_json(functions.col("message"),schema).as("json"))
      .select("json.*")
      .as(Encoders.bean(VideoEventData.class));

我想执行我的逻辑,因为我有一个尺寸n

的精确数据集。

在开箱即用的火花结构化流(和一般火花)中不可能。

您有以下选项:

  1. 使用Kafka消费者属性配置坐在Kafka源后面的Kafka消费者。

  2. 作为任意状态汇总的一部分,缓冲行

  3. 写一个自定义源以处理缓冲本身。

for 2.我可以使用keyValueGroupedDataset.flatmapgroupswithstate,其状态会累积在"块"上,最终将为您提供大小n。

用于3.实现自定义状态流源,该源将以getOffset的方式仅在N行时给出getOffsetgetBatch

免责声明:我以前从未做过任何一种解决方案,但它们看起来可行。

您可以通过配置Kafka消费者本身来做到这一点。将fetch.min.bytes设置为您想要拥有的最小值。这会告诉Kafka等到拥有足够的数据为止。

有一个相关的设置fetch.max.wait.ms,可以控制Kafka最多需要多长时间。默认情况下,此值为500毫秒。您可以在此处阅读更多。

相关内容

  • 没有找到相关文章

最新更新