在Spark流媒体中,我在日志到达时获得日志。但我想在一次传递中得到至少N个对数。怎样才能实现呢?
从这个答案来看,似乎在Kafka中有这样一个实用程序,但似乎没有在Spark中出现以使其成为可能。
没有选项允许您为从Kafka接收的消息数量设置最小值。选项maxOffsetsPerTrigger
允许您设置消息的最大值。
如果你想让你的微批处理一次处理更多的消息,你可以考虑增加触发间隔。
同样(参考你提供的链接),这也是不可能在Kafka本身设置的。您可以设置最小提取字节数,但不能设置最小消息数。
注意,您可以通过结构化流中的readStream通过前缀kafka.
设置所有Kafka选项,如Kafka特定配置:
"Kafka自己的配置可以通过DataStreamReader设置。选项与kafka。前缀,例如stream.option("kafka.bootstrap.servers", "host:port").">
这样,您还可以使用消费者配置kafka.fetch.min.bytes
。然而,用Spark 3.0.1在本地Kafka 2.5.0安装上测试它并没有任何影响。当添加配置kafka.fetch.max.wait.ms
时,我的测试中的获取时间确实发生了变化,但不是以可预测的方式(至少对我来说)。
查看Spark的KafkaDataConsumer的源代码,看起来与纯KafkaConsumer相比,fetch
没有直接说明任何min/max字节。