使用Kafka -python Kafka客户端的KafkaConsumer实例读取Kafka中最古老的可用消息



我尝试使用以下命令读取kafka consumer中的消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

这里我们可以读取大约4天的旧消息,因为我们在kafka服务器配置文件中设置了保留时间为7天。但是,当我们尝试使用KafkaConsumer的kaka-python客户端库读取消息时,如下所示:

cons = KafkaConsumer("localhost:9092", "test","smallest")
cons.fetch_messages()

我们得到今天的消息只是有一些偏移。我不知道如何获得Kafka中可用的最古老的消息,就像我们在上面的Kafka消费者shell脚本中得到的那样。请帮助。

文档显示通过namedtuples传入的配置。

consumer = KafkaConsumer('topic1', 'topic2',
                         bootstrap_servers=['localhost:9092'],
                         group_id='my_consumer_group',
                         auto_commit_enable=True,
                         auto_commit_interval_ms=30 * 1000,
                         auto_offset_reset='smallest')

最新更新