我使用的是apache storm core 0.9.6(它很旧,由于遗留问题,无法对此采取任何措施(KafkaSpout
。如果我能记录元组偏移量,那么调试我面临的一些反序列化问题将非常有帮助。
到目前为止,我已经看到storm.kafka.KafkaUtils
有两种方法:
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config)
public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime)
其中第一个是围绕第二个的包装物。在我看来,我唯一不知道如何调用这个函数的是consumer
。我已经阅读了KafkaSpout
代码,但还不知道如何从中获得卡夫卡消费者。
如果您查看了storm-kafka代码,您可以在获取消息后执行任何需要的日志记录https://github.com/apache/storm/blob/v0.9.6/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java#L162.你可以自己构建storm kafka的修改版本,并将你的构建工具指向你自己的副本。