Python Confluent-kafka DeserializingConsumer不会从Kafka topic中读



我有一个avro模式的主题,我通过Python代码生成消息,它工作完全正常。当我从CLI中使用消息时,我可以成功地使用它们而不会出现错误。

当我试图通过Python代码消费时,它打印'None',基本上它试图读取但没有,我试图打印偏移量,它抛出'-1001'。

方法的目的是读取所有最新的消息,用这些消息创建一个列表并返回该列表。

注意:-我也试过使用'enable.auto.commit' = True,但它不起作用,所以从我的配置中删除了它。

Library in require .txt = confluent-kafka[avro]>=1.4.2

conf = {
'bootstrap.servers': 'dummyvalue',
'security.protocol': 'dummyvalue',
'sasl.mechanism': 'PLAIN',
'sasl.username': 'dummyvalue',
'sasl.password': 'dummyvalue',
'session.timeout.ms': 45000,
'schema.registry.url': 'dummyvalue',
'basic.auth.credentials.source': 'dummyvalue',
'basic.auth.user.info': 'dummyvalue',
'use.latest.version': True
} 

schema_registry_conf = {
'url': conf['schema.registry.url'],
'basic.auth.user.info': conf['basic.auth.user.info']
}

def _set_consumer_config(self, conf, avro_deserializer):
consumer_conf = self._popSchemaRegistryParamsFromConfig(conf) 
#above method will remove unnecessary configs from main conf dictionary so consumer_conf has only relevant properties
consumer_conf['value.deserializer'] = avro_deserializer
consumer_conf['group.id'] = "python_example"
consumer_conf['auto.offset.reset'] = 'latest'    
return consumer_conf

def get_list_of_unconsumed_msgs(self, topic):
text_file = open('avro schema file path')
avro_schema = text_file.read()
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(schema_registry_client,avro_schema)
consumer = DeserializingConsumer(self._set_consumer_config(conf, avro_deserializer))
consumer.subscribe([topic])
messages = []
polling_count = 5
while polling_count >= 1:
try:
print(consumer.position([TopicPartition(topic, 0)]))
print(f"Consumer Committed {consumer.committed([TopicPartition(topic, 0)])}")
print(f"Consumer Assignment {consumer.assignment()}")
msg = consumer.poll(3.0)
if msg is None:
polling_count = polling_count - 1
continue
elif msg.error():
print('error: {}'.format(msg.error()))
else:
messages.append([msg.value()])
except SerializerError as e:
# Report malformed record, discard results, continue polling
print("Message deserialization failed {}".format(e))
consumer.close()
return messages
def main():
msg = {}
topic_name = "aa_automation_test"
msg = obj.get_list_of_unconsumed_msgs(topic)
print(f"Received Message as :- {msg}")

打印语句的输出:

[Prints an empty list, for debugging I have printed offset and it throws -1001]
[TopicPartition{topic=aa_automation_test,partition=0,offset=-1001,error=None}]
Consumer Committed [TopicPartition{topic=aa_automation_test,partition=0,offset=-1001,error=None}]
Consumer Assignment [] 
Received Message as :- []

您看到的是默认值。

在您实际poll并连接到代理之前不会有任何数据。

Kafka自己跟踪未提交的偏移量。你不需要在你的应用中实现这个逻辑。

如果你在结尾打印一个空列表,从你所显示的,这意味着你已经到达主题的结尾。

一次拉5个,参见consume(num_messages)

检查分区的(结束)偏移量,get_watermark_offsets,您可以从consumer.committed()中减去它来查看延迟。

最新更新