Apache风暴和卡夫卡:我如何获得卡夫卡喷口的消费者对象,以便记录其偏移量



我使用的是apache storm core 0.9.6(它很旧,由于遗留问题,无法对此采取任何措施(KafkaSpout。如果我能记录元组偏移量,那么调试我面临的一些反序列化问题将非常有帮助。

到目前为止,我已经看到storm.kafka.KafkaUtils有两种方法:

public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) 
public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) 

其中第一个是围绕第二个的包装物。在我看来,我唯一不知道如何调用这个函数的是consumer。我已经阅读了KafkaSpout代码,但还不知道如何从中获得卡夫卡消费者。

如果您查看了storm-kafka代码,您可以在获取消息后执行任何需要的日志记录https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L162.你可以自己构建storm kafka的修改版本,并将你的构建工具指向你自己的副本。

最新更新