我有一个avro模式的主题,我通过Python代码生成消息,它工作完全正常。当我从CLI中使用消息时,我可以成功地使用它们而不会出现错误。
当我试图通过Python代码消费时,它打印'None',基本上它试图读取但没有,我试图打印偏移量,它抛出'-1001'。
方法的目的是读取所有最新的消息,用这些消息创建一个列表并返回该列表。
注意:-我也试过使用'enable.auto.commit' = True,但它不起作用,所以从我的配置中删除了它。
Library in require .txt = confluent-kafka[avro]>=1.4.2
conf = {
'bootstrap.servers': 'dummyvalue',
'security.protocol': 'dummyvalue',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'dummyvalue',
'sasl.password': 'dummyvalue',
'session.timeout.ms': 45000,
'schema.registry.url': 'dummyvalue',
'basic.auth.credentials.source': 'dummyvalue',
'basic.auth.user.info': 'dummyvalue',
'use.latest.version': True
}
schema_registry_conf = {
'url': conf['schema.registry.url'],
'basic.auth.user.info': conf['basic.auth.user.info']
}
def _set_consumer_config(self, conf, avro_deserializer):
consumer_conf = self._popSchemaRegistryParamsFromConfig(conf)
#above method will remove unnecessary configs from main conf dictionary so consumer_conf has only relevant properties
consumer_conf['value.deserializer'] = avro_deserializer
consumer_conf['group.id'] = "python_example"
consumer_conf['auto.offset.reset'] = 'latest'
return consumer_conf
def get_list_of_unconsumed_msgs(self, topic):
text_file = open('avro schema file path')
avro_schema = text_file.read()
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(schema_registry_client,avro_schema)
consumer = DeserializingConsumer(self._set_consumer_config(conf, avro_deserializer))
consumer.subscribe([topic])
messages = []
polling_count = 5
while polling_count >= 1:
try:
print(consumer.position([TopicPartition(topic, 0)]))
print(f"Consumer Committed {consumer.committed([TopicPartition(topic, 0)])}")
print(f"Consumer Assignment {consumer.assignment()}")
msg = consumer.poll(3.0)
if msg is None:
polling_count = polling_count - 1
continue
elif msg.error():
print('error: {}'.format(msg.error()))
else:
messages.append([msg.value()])
except SerializerError as e:
# Report malformed record, discard results, continue polling
print("Message deserialization failed {}".format(e))
consumer.close()
return messages
def main():
msg = {}
topic_name = "aa_automation_test"
msg = obj.get_list_of_unconsumed_msgs(topic)
print(f"Received Message as :- {msg}")
打印语句的输出:
[Prints an empty list, for debugging I have printed offset and it throws -1001]
[TopicPartition{topic=aa_automation_test,partition=0,offset=-1001,error=None}]
Consumer Committed [TopicPartition{topic=aa_automation_test,partition=0,offset=-1001,error=None}]
Consumer Assignment []
Received Message as :- []
您看到的是默认值。
在您实际poll
并连接到代理之前不会有任何数据。
Kafka自己跟踪未提交的偏移量。你不需要在你的应用中实现这个逻辑。
如果你在结尾打印一个空列表,从你所显示的,这意味着你已经到达主题的结尾。
一次拉5个,参见consume(num_messages)
检查分区的(结束)偏移量,get_watermark_offsets
,您可以从consumer.committed()
中减去它来查看延迟。