正确寻找和消费卡夫卡关于多分区主题的信息



我最近发现我一直在使用的主题是多分区而不是单分区。我需要重新配置我的使用者类来处理多个分区,但我有点困惑。我目前正在使用一个偏移组,为了下面的例子,让我们称之为test_offset_group。在正常情况下,我总是线性地解析,并在时间上继续前进;当消息被添加到主题中时,我会解析它们并继续前进,但如果出现崩溃或需要返回并重新运行前一天的提要,我需要能够按时间戳进行搜索。Kafka在这个项目中是强制性的,所以我无法更改我正在使用的流数据服务的类型。

我这样配置我的消费者:

test_consumer = KafkaConsumer("test_topic", bootstrap_servers="bootstrap_string", enable_auto_commit=False, group_id="test_offset_group"

如果我需要查找时间戳,我将提供时间戳,然后使用以下方法进行查找:

test_consumer.poll()
tp = TopicPartition("test_topic", 0)
needed_date = datetime.timestamp(timestamp)
rec_in = test_consumer.offsets_for_times({tp: needed_date * 1000})
test_consumer.seek(tp, rec_in[tp].offset)

上面的功能非常适合单个分区的消费者,但当你考虑到多个分区时,这感觉非常笨拙和困难。我想我可以用test_consumer.partitions_for_topic('test_topic")然后迭代它们中的每一个,但同样,这似乎违背了卡夫卡的思想,我觉得应该有一种更简单的方法来做到这一点。

总之:我想了解如何利用offset_group功能在多个分区中寻找大量偏移,我想确认,通过执行上述操作,我实际上忽略了除0之外的所有分区

您正在执行正确的逻辑,只需要在该使用者实例的所有分区上执行即可。

您可以使用assignment()检索当前分配。

最新更新