Kafka 连接无法以"牧民工作线程中未捕获的异常..."开头。位置可以确定



我正在使用kafka Connect版本confluentinc/cp-kafka-connect:5.1.1-1。和kafka群集kafka_2.11-0.11.0.3(3个经纪人(

这个Kafka群集与旧生产商/消费者使用-Spark -stream。

现在我尝试添加kafka connect,并收到以下错误:

ERROR Uncaught exception in herder work thread, exiting:  
(org.apache.kafka.connect.runtime.distributed.DistributedHerder)
org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition kc-offsets-22 could be determined

我可以看到这个主题存在。我什至可以使用以下命令写入并阅读此主题:

./kafka-console-producer.sh 
    --broker-list `hostname`:9092 
    --topic kc-offsets 
    --property "parse.key=true" 
./kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic kc-offsets --from-beginning --property print.key=true

Kafka Connect机器与我所有的经纪人都有连接。

,但由于某种原因,Kafka Connect不会启动。

我真的很感谢如何调查/解决它的任何建议

更新:我试图将复制品更改为1,但这并没有帮助

错误表明某些记录以比客户端发送的速度快于队列。

当您的生产者(在这种情况下Kafka连接(发送消息时,它们会存储在缓冲区中(在发送到目标经纪人之前(并将记录分组为批处理以增加吞吐量。当将新记录添加到批处理中时,必须在由request.timeout.ms控制的 - 可配置的时间窗口中发送(默认设置为30秒(。如果批处理在队列中持续更长的时间,则抛出TimeoutException,然后将批次记录从队列中删除,并且不会交付给经纪人。

增加request.timeout.ms的值应该为您带来技巧。

如果这不起作用,您也可以尝试减少batch.size,以便更频繁地发送批次(但这一次将包含更少的消息(,并确保将linger.ms设置为0(这是默认值(。

如果您仍然遇到错误,我认为您的网络正在发生一些错误。您是否启用了SSL?

我找到了该问题的根本原因,我们有一个损坏的 __ commuter_offsets 主题。在Kafka-connect之前,我们使用了旧式消费者,因此我们没有看到这个问题。我们的解决方案是创建新的Kafka群集,这解决了问题。

btw要看到主题正在奏效,只需要从它中阅读:

./kafka-console-consumer.sh --zookeeper $KAFKA_ZOOKEEPER_CONNECT --topic __consumer_offsetss --from-beginning --property print.key=true

相关内容

最新更新