Kafka Consumer:如何在Python中从最后一条消息开始消费



我正在使用Kafka 0.8.1和Kafka python-0.9.0。在我的设置中,我有2个kafka代理设置。当我运行kafka消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好!

我的问题是,当我重新启动消费者时,它开始从头开始消费消息。我所期望的是,在重新启动时,消费者将从它死亡前离开的地方开始消费消息。

我确实尝试在Redis中跟踪消息偏移,然后调用消费者。在从队列中读取消息之前进行seek操作,以确保我只获取以前没有见过的消息。虽然这是有效的,但在部署这个解决方案之前,我想和你们一起检查一下……也许我对Kafka或python-Kafka客户端有一些误解。似乎消费者能够从停止的地方重新开始阅读是非常基本的功能。

谢谢!

小心kafka-python库。它有一些小问题。

如果速度对你的用户来说不是问题,你可以在每条消息中设置自动提交。应该可以的。

SimpleConsumer提供了一个seek方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185),它允许你在任何你想要的点开始消费消息。

最常见的调用是:

  • consumer.seek(0, 0)从队列开头开始读取。
  • consumer.seek(0, 1)从当前偏移量开始读取。
  • consumer.seek(0, 2)跳过所有待处理的消息并开始只读取新消息。

第一个参数是这些位置的偏移量。这样,如果调用consumer.seek(5, 0),您将跳过队列中的前5条消息。

另外,不要忘记,偏移量是为消费者组存储的。

kafka-python将偏移量存储在kafka服务器上,而不是单独的zookeeper连接上。不幸的是,在apache kafka 0.8.1.1之前,kafka服务器支持提交/读取偏移量的api还没有完全实现。如果你升级了kafka服务器,你的设置应该可以工作。我还建议将kafka-python升级到0.9.4。

(kafka-python维护者)

首先,您需要设置一个group_id,记录偏移量,以便它将继续消费来自该group_id的消息。

如果您已经使用了组中的所有现有消息,那么您希望再次重新使用这些消息。您可以使用seek来实现这一点。

下面是一个例子:

def test_consume_from_offset(offset):
    topic = 'test'
    consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id='test')
    tp = TopicPartition(topic=topic, partition=0)
    consumer.assign([tp])
    consumer.seek(tp, offset)   # you can set the offset you want to resume from.
    for msg in consumer:
        # the msg begins with the offset you set
        print(msg)
test_consume_from_offset(10)

Kafka消费者能够在Zookeeper中存储偏移量。在Java API中,我们有两种选择——高级消费者,它为我们管理状态,并在重启后从它离开的地方开始消费,以及无状态的低级消费者,没有这个超能力。

根据我对Python消费者代码(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py)的理解,SimpleConsumer和MultiProcessConsumer都是有状态的,并跟踪Zookeeper中的当前偏移量,所以你有这个重复消费问题是很奇怪的。

确保在重启时使用相同的消费者组id(可能是随机设置的?)并检查以下选项:

auto_commit: default True. Whether or not to auto commit the offsets
auto_commit_every_n: default 100. How many messages to consume
                     before a commit
auto_commit_every_t: default 5000. How much time (in milliseconds) to
                     wait before commit

可能是你消费了<100条信息或<5000毫秒?

你只需要确保你的Kafka消费者从最近的偏移量(auto.offset.reset="latest")开始读取。还要确保您定义了一个消费者组,以便可以提交偏移量,并且当消费者下降时可以选择其最后提交的位置。


使用confluent-kafka-python

from confluent_kafka import Consumer

c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'latest'
})
c.subscribe(['my_topic'])

使用kafka-python

from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'my_topic', 
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='latest', 
    enable_auto_commit=True,
    group_id='mygroup'
)

最新更新