我正在使用kafka Connect版本confluentinc/cp-kafka-connect:5.1.1-1
。和kafka群集kafka_2.11-0.11.0.3
(3个经纪人(
这个Kafka群集与旧生产商/消费者使用-Spark -stream。
现在我尝试添加kafka connect,并收到以下错误:
ERROR Uncaught exception in herder work thread, exiting:
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition kc-offsets-22 could be determined
我可以看到这个主题存在。我什至可以使用以下命令写入并阅读此主题:
./kafka-console-producer.sh
--broker-list `hostname`:9092
--topic kc-offsets
--property "parse.key=true"
./kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic kc-offsets --from-beginning --property print.key=true
Kafka Connect机器与我所有的经纪人都有连接。
,但由于某种原因,Kafka Connect不会启动。
我真的很感谢如何调查/解决它的任何建议
更新:我试图将复制品更改为1,但这并没有帮助
错误表明某些记录以比客户端发送的速度快于队列。
当您的生产者(在这种情况下Kafka连接(发送消息时,它们会存储在缓冲区中(在发送到目标经纪人之前(并将记录分组为批处理以增加吞吐量。当将新记录添加到批处理中时,必须在由request.timeout.ms
控制的 - 可配置的时间窗口中发送(默认设置为30秒(。如果批处理在队列中持续更长的时间,则抛出TimeoutException
,然后将批次记录从队列中删除,并且不会交付给经纪人。
增加request.timeout.ms
的值应该为您带来技巧。
如果这不起作用,您也可以尝试减少batch.size
,以便更频繁地发送批次(但这一次将包含更少的消息(,并确保将linger.ms
设置为0(这是默认值(。
如果您仍然遇到错误,我认为您的网络正在发生一些错误。您是否启用了SSL?
我找到了该问题的根本原因,我们有一个损坏的 __ commuter_offsets 主题。在Kafka-connect之前,我们使用了旧式消费者,因此我们没有看到这个问题。我们的解决方案是创建新的Kafka群集,这解决了问题。
btw要看到主题正在奏效,只需要从它中阅读:
./kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic __consumer_offsetss --from-beginning --property print.key=true