我尝试使用以下命令读取kafka consumer中的消息:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
这里我们可以读取大约4天的旧消息,因为我们在kafka服务器配置文件中设置了保留时间为7天。但是,当我们尝试使用KafkaConsumer的kaka-python客户端库读取消息时,如下所示:
cons = KafkaConsumer("localhost:9092", "test","smallest")
cons.fetch_messages()
我们得到今天的消息只是有一些偏移。我不知道如何获得Kafka中可用的最古老的消息,就像我们在上面的Kafka消费者shell脚本中得到的那样。请帮助。
文档显示通过namedtuples
传入的配置。
consumer = KafkaConsumer('topic1', 'topic2',
bootstrap_servers=['localhost:9092'],
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30 * 1000,
auto_offset_reset='smallest')