设置使用者偏移



如果我想从中获取所有消息以启动偏移量,我会运行以下shell命令:

/usr/share/kafka/bin/kafka-console-consumer.sh 
--bootstrap-server localhost:9092 
--topic 1-codicefiscale-21032022122736-i 
--partition 0 
--offset 5

如何使用kafka python库进行同样的操作?

def demoListMessages(topicName):
consumer = KafkaConsumer(topicName, 
bootstrap_servers="localhost:9092", 
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
for msg in consumer:
print(msg.value)
demoListMessages("1-codicefiscale-21032022122736-i")

我必须设置分区。这个片段对我有用。

def demoListPageMessages(topicName):
consumer = KafkaConsumer(bootstrap_servers="localhost:9092",auto_offset_reset='earliest',consumer_timeout_ms=1000)
tp = TopicPartition(topicName, 0)
consumer.assign([tp])
consumer.seek_to_beginning()
consumer.seek(tp, 5)
for msg in consumer:
print(msg.value)

相关内容

最新更新