>我有一个 Kafka Streams 应用程序从具有 3 个代理和复制因子 3 的 Kafka 集群中消费和生产。除了使用者偏移量主题(50 个分区)之外,所有其他主题每个主题只有一个分区。
当代理尝试首选副本选择时,Streams 应用程序(在与代理完全不同的实例上运行)失败并显示错误:
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
...
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
Streams 应用尝试成为分区的领导者是否正常,因为它运行的服务器不属于 Kafka 群集?
我可以通过以下方式按需重现此行为:
杀死- 其中一个代理(因此,其他 2 个代理接管所有分区的领导者,这些分区将杀死的代理作为他们的领导者,正如预期的那样)
- 将被杀的经纪人重新带回来
- 使用
bin/kafka-preferred-replica-election.sh --zookeeper localhost
触发首选副本领导者选举
我的问题似乎与此报告的失败类似,所以我想知道这是否是一个新的 Kafka Streams 错误。我的完整堆栈跟踪实际上与报告失败(此处)中链接的要点完全相同。
另一个可能有趣的细节是,在领导人选举期间,我在经纪人controller.log
中收到以下消息:
[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
我最初认为这个连接错误是罪魁祸首,但在领导者选举导致 Streams 应用程序崩溃后,如果我重新启动 Streams 应用程序,它会正常工作,直到下一次选举,我根本没有接触经纪人。
所有服务器(3 个 Kafka 代理和 Streams 应用程序)都在 EC2 实例上运行。
这在 0.10.2.1 中现已修复。如果您无法获取它,请确保在流配置中按如下方式设置了这两个参数:
final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));