python合流kafka:组授权失败



在尝试使用python客户端消费Kafka的消息时,如何修复:Group authorization failed错误?

同样的设置在Kafka CLI中也可以正常工作。

通常这个错误应该指向无效权限,但是,由于CLI正常工作,我怀疑它一定是其他原因。

我的kafka实例已启用SASL/SSL。使用以下设置:

src_schema_registry_conf = {
'url': 'http://<<host>>:8081'
}
src_schema_registry_client = SchemaRegistryClient(src_schema_registry_conf)
string_deserializer = StringDeserializer('utf_8')
avro_deserializer = AvroDeserializer(src_schema_registry_client)#,
src_conf = {
"bootstrap.servers": '<<host>>:9093',
"ssl.ca.location": 'certs/catrust.pem',
"security.protocol":"SASL_SSL",
"sasl.mechanism":"PLAIN",
"sasl.username":"<<username>",
"sasl.password":'<<secret_pasword',
'key.deserializer': string_deserializer,
'value.deserializer': avro_deserializer,
'group.id': "test-consumer-group",
'auto.offset.reset': "earliest"
}

消息的消耗是从以下位置启动的:

consumer = DeserializingConsumer(src_conf)
consumer.subscribe(['<<topic>>'])
while True:
try:
# SIGINT can't be handled when polling, limit timeout to 1 second.
msg = consumer.poll(1.0)
if msg is None:
continue
user = msg.value()
if user is not None:
print("record {}: value: {}n"
.format(msg.key(), user))
except KeyboardInterrupt:
break
consumer.close()

我试着按照这里的例子:https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/avro_consumer.py但无法在我的环境中发挥作用。

事实证明,答案比预期的更简单:

用户名必须写在CCD_ 2中。然后它被接受。

答案是API密钥无法访问主题

https://support.confluent.io/hc/en-us/articles/13193806015252-How-to-fix-the-error-Group-authorization-failed-FindCoordinator-response-error-Group-authorization-failed-

最新更新