如何通过Spark从Kafka获取至少N条日志?



在Spark流媒体中,我在日志到达时获得日志。但我想在一次传递中得到至少N个对数。怎样才能实现呢?

从这个答案来看,似乎在Kafka中有这样一个实用程序,但似乎没有在Spark中出现以使其成为可能。

没有选项允许您为从Kafka接收的消息数量设置最小值。选项maxOffsetsPerTrigger允许您设置消息的最大值

如果你想让你的微批处理一次处理更多的消息,你可以考虑增加触发间隔。

同样(参考你提供的链接),这也是不可能在Kafka本身设置的。您可以设置最小提取字节数,但不能设置最小消息数。

注意,您可以通过结构化流中的readStream通过前缀kafka.设置所有Kafka选项,如Kafka特定配置:

一节所述

"Kafka自己的配置可以通过DataStreamReader设置。选项与kafka。前缀,例如stream.option("kafka.bootstrap.servers", "host:port").">

这样,您还可以使用消费者配置kafka.fetch.min.bytes。然而,用Spark 3.0.1在本地Kafka 2.5.0安装上测试它并没有任何影响。当添加配置kafka.fetch.max.wait.ms时,我的测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我来说)。

查看Spark的KafkaDataConsumer的源代码,看起来与纯KafkaConsumer相比,fetch没有直接说明任何min/max字节。

最新更新