我正在使用python kafka生产者编写主题。Kafka服务器是外部的,我使用Kerberos连接凭据。当我运行生产者脚本时,似乎正在建立与服务器的连接,但由于某种原因,元数据请求失败,并显示"请求1:MetadataRequest_v1(topics=NULL("。
这是生产商代码:
from kafka import KafkaProducer
broker = ['mybroker.com:9092']
p = KafkaProducer(bootstrap_servers=broker,security_protocol='SSL',ssl_cafile="truststore.pem",ssl_keyfile='truststore.jks',sasl_mechanism='GSSAPI',api_version=(0, 10, 1))
topic = 'test-topic'
# Write hello world to test topic
p.send(topic, bytes("Hello World", 'utf-8'))
p.flush()
这是我目前得到的错误:
EBUG:kafka.producer.kafka:Starting the Kafka producer
DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed
DEBUG:kafka.metrics.metrics:Added sensor with name connections-created
DEBUG:kafka.metrics.metrics:Added sensor with name select-time
DEBUG:kafka.metrics.metrics:Added sensor with name io-time
INFO:kafka.client:Bootstrapping cluster metadata from [('fakehost.lef.ceg.com', 9092, <AddressFamily.AF_UNSPEC: 0>)]
DEBUG:kafka.client:Attempting to bootstrap via node at mybroker.com:9092
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name request-latency
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap.latency
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)>: creating new socket
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)=9092>: setting socket option (6, 1, 1)
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: configuring default SSL Context
INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: Loading SSL CA from /truststore.pem
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: wrapping socket in ssl context
INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: connecting to iphere
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: established TCP connection
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: initiating SSL handshake
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: completed SSL handshake.
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: Connection complete.
DEBUG:kafka.client:Node bootstrap connected
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092> Request 1: MetadataRequest_v1(topics=NULL)
ERROR:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: socket disconnected
INFO:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: Closing connection. ConnectionError: socket disconnected
DEBUG:kafka.conn:<BrokerConnection node_id=bootstrap host=:)/iphere port=9092>: reconnect backoff 0.04487732169774206 after 1 failures
ERROR:kafka.client:Unable to bootstrap from [('mybroker.com', 9092, <AddressFamily.AF_UNSPEC: 0>)]
DEBUG:kafka.metrics.metrics:Added sensor with name bufferpool-wait-time
DEBUG:kafka.metrics.metrics:Added sensor with name batch-size
DEBUG:kafka.metrics.metrics:Added sensor with name compression-rate
DEBUG:kafka.metrics.metrics:Added sensor with name queue-time
DEBUG:kafka.metrics.metrics:Added sensor with name produce-throttle-time
DEBUG:kafka.metrics.metrics:Added sensor with name records-per-request
DEBUG:kafka.metrics.metrics:Added sensor with name bytes
DEBUG:kafka.metrics.metrics:Added sensor with name record-retries
DEBUG:kafka.metrics.metrics:Added sensor with name errors
DEBUG:kafka.metrics.metrics:Added sensor with name record-size-max
DEBUG:kafka.producer.sender:Starting Kafka producer I/O thread.
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.producer.kafka:Kafka producer started
DEBUG:kafka.producer.kafka:Requesting metadata update for topic test-topic
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
DEBUG:kafka.client:Give up sending metadata request since no node is available
这到底意味着什么?我该如何调试它?
使用以下代码使用python 启动生产者
from kafka import KafkaProducer
import time
import json
topic = 'testing'
bootstrap_servers = '<your broker>:9092'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
msg=json.dumps({"status":True}) # update your message here
future = producer.send(topic, msg.encode('utf-8'))
result = future.get(timeout=60)
metrics = producer.metrics()
print(metrics)
消息格式是独立的,您可以使用任何格式发送消息