Apache Storm:如何从Kafka Spout微批处理事件



如何在 kafka 喷口中微批处理事件以减少随后螺栓中的 IO 调用? 期望是:使用 kafka 中的事件发出最大大小为 100 的批处理,但最多等待 1 秒以形成此批处理。如果 1 秒内没有足够的事件,请发出可用事件。

我可以通过"source.groupedWithin"方法在 Akka 中实现相同的目标。我如何对卡夫卡喷口做同样的事情?

查看 Storm 的刻度元组,它提供了一种将计划的元组(刻度)发送到您的 bolt 的方法。 对于您的情况,您可以配置每秒一个刻度。 同时,bolt 将简单地处理来自 Kafka 喷口的元组并对其进行批处理,当它达到 100 条消息(在您的情况下)或当您获得一个滴答元组时发送批处理。 请注意,您确实需要检查每个输入元组,以查看它是刻度还是 Kafka 消息。

除了 Chris 的回答,您还可以使用 Storm 的窗口功能 https://storm.apache.org/releases/2.0.0/Windowing.html。您可以在 https://github.com/apache/storm/blob/master/examples/storm-starter/src/jvm/org/apache/storm/starter/SlidingWindowTopology.java

如果您愿意,也可以为此使用三叉戟。设置KafkaTridentSpoutOpaque后,可以使用 Kafka 客户端设置来控制每个批处理中的消息数。您可以使用KafkaSpoutConfigpollTimeoutMs来设置等待批处理填充的时间,并通过KafkaSpoutConfig.Builder.setProp设置max.poll.recordsKafka 客户端配置以控制批处理中的最大记录数。

有关使用 Kafka 三叉戟喷口的完整示例,请参阅 https://github.com/apache/storm/blob/master/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java

最新更新