我正在使用Kafka 0.8.1和Kafka python-0.9.0。在我的设置中,我有2个kafka代理设置。当我运行kafka消费者时,我可以看到它从队列中检索消息并跟踪两个代理的偏移量。一切都很好!
我的问题是,当我重新启动消费者时,它开始从头开始消费消息。我所期望的是,在重新启动时,消费者将从它死亡前离开的地方开始消费消息。
我确实尝试在Redis中跟踪消息偏移,然后调用消费者。在从队列中读取消息之前进行seek操作,以确保我只获取以前没有见过的消息。虽然这是有效的,但在部署这个解决方案之前,我想和你们一起检查一下……也许我对Kafka或python-Kafka客户端有一些误解。似乎消费者能够从停止的地方重新开始阅读是非常基本的功能。
谢谢!
小心kafka-python库。它有一些小问题。
如果速度对你的用户来说不是问题,你可以在每条消息中设置自动提交。应该可以的。
SimpleConsumer提供了一个seek
方法(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer/simple.py#L174-L185),它允许你在任何你想要的点开始消费消息。
最常见的调用是:
-
consumer.seek(0, 0)
从队列开头开始读取。 -
consumer.seek(0, 1)
从当前偏移量开始读取。 -
consumer.seek(0, 2)
跳过所有待处理的消息并开始只读取新消息。
第一个参数是这些位置的偏移量。这样,如果调用consumer.seek(5, 0)
,您将跳过队列中的前5条消息。
另外,不要忘记,偏移量是为消费者组存储的。
kafka-python将偏移量存储在kafka服务器上,而不是单独的zookeeper连接上。不幸的是,在apache kafka 0.8.1.1之前,kafka服务器支持提交/读取偏移量的api还没有完全实现。如果你升级了kafka服务器,你的设置应该可以工作。我还建议将kafka-python升级到0.9.4。
(kafka-python维护者)首先,您需要设置一个group_id,记录偏移量,以便它将继续消费来自该group_id
的消息。
如果您已经使用了组中的所有现有消息,那么您希望再次重新使用这些消息。您可以使用seek
来实现这一点。
下面是一个例子:
def test_consume_from_offset(offset):
topic = 'test'
consumer = KafkaConsumer(bootstrap_servers=broker_list, group_id='test')
tp = TopicPartition(topic=topic, partition=0)
consumer.assign([tp])
consumer.seek(tp, offset) # you can set the offset you want to resume from.
for msg in consumer:
# the msg begins with the offset you set
print(msg)
test_consume_from_offset(10)
Kafka消费者能够在Zookeeper中存储偏移量。在Java API中,我们有两种选择——高级消费者,它为我们管理状态,并在重启后从它离开的地方开始消费,以及无状态的低级消费者,没有这个超能力。
根据我对Python消费者代码(https://github.com/mumrah/kafka-python/blob/master/kafka/consumer.py)的理解,SimpleConsumer和MultiProcessConsumer都是有状态的,并跟踪Zookeeper中的当前偏移量,所以你有这个重复消费问题是很奇怪的。
确保在重启时使用相同的消费者组id(可能是随机设置的?)并检查以下选项:
auto_commit: default True. Whether or not to auto commit the offsets auto_commit_every_n: default 100. How many messages to consume before a commit auto_commit_every_t: default 5000. How much time (in milliseconds) to wait before commit
可能是你消费了<100条信息或<5000毫秒?
你只需要确保你的Kafka消费者从最近的偏移量(auto.offset.reset="latest"
)开始读取。还要确保您定义了一个消费者组,以便可以提交偏移量,并且当消费者下降时可以选择其最后提交的位置。
使用confluent-kafka-python
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'mygroup',
'auto.offset.reset': 'latest'
})
c.subscribe(['my_topic'])
使用kafka-python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'my_topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
enable_auto_commit=True,
group_id='mygroup'
)