我正在尝试通过python将消息发布到kafka主题,但收到错误。 我可以通过 CLI 连接和发布。希望得到一些指导。我已经用谷歌搜索并阅读了文档。谢谢!!
成功的命令行命令:
kafka-console-producer --broker-list
123.45.67.891:1234,123.45.67.892:1234,123.45.67.893:1234 --
producer.config C:Usersfake_userKafkaclient-ssl.properties --topic FakeTopic
Contents of client-ssl.properties:
security.protocol = SSL
ssl.truststore.location = C:/Users/fake_user/Kafka/kafka-truststore
ssl.truststore.password = fakepass
法典:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['123.45.67.891:1234', '123.45.67.892:1234', '123.45.67.893:1234'],
security_protocol='SSL',
ssl_certfile=r'C:Usersfake_userKafkakafka-truststore',
ssl_password='fakepass')
producer.send('FakeTopic', value='python_test', key='test')
结果错误:
Traceback (most recent call last):
File "kafka_post_test.py", line 6, in <module>
ssl_password='fakepass')
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaproducerkafka.py", line 381, in __init__
**self.config)
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaclient_async.py", line 239, in __init__
self.config['api_version'] = self.check_version(timeout=check_timeout)
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaclient_async.py", line 874, in check_version
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaconn.py", line 1078, in check_version
if not self.connect_blocking(timeout_at - time.time()):
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaconn.py", line 331, in connect_blocking
self.connect()
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaconn.py", line 420, in connect
if self._try_handshake():
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libsite-packageskafkaconn.py", line 496, in _try_handshake
self._sock.do_handshake()
File "C:Usersfake_userAppDataLocalProgramsPythonPython37-32libssl.py", line 1117, in do_handshake
self._sslobj.do_handshake()
ssl.SSLCertVerificationError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed: self signed certificate in certificate chain (_ssl.c:1051)
查看此链接。
您必须将SSL证书添加到JVM密钥库中,用于Java运行的任何程序。
我发现默认情况下,python-kafka 库将 ssl_cafile 属性设置为 None。将其设置为默认操作系统(linux上的/etc/pki/tls/cert.pem(允许我连接到kafka代理。
https://kafka-python.readthedocs.io/en/master/_modules/kafka/producer/kafka.html#KafkaProducer.send