Kafka 消费者总是给出一个 java.nio.channels.ClosedChannelException



我正在尝试执行以下命令:

./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 101.10.51.1:9092,101.10.51.4:9092 --topic namespace_deep_archive_d_billing_transaction --time -2

并且程序总是收到以下错误:

[2018-08-23 12:36:58,604] WARN Fetching topic metadata with correlation id 0 for topics [Set(namespace_deep_archive_d_billing_transaction)] from broker [BrokerEndPoint(0,101.10.51.1,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:124)
at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:82)
at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:81)
at kafka.producer.SyncProducer.send(SyncProducer.scala:126)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:63)
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:99)
at kafka.tools.GetOffsetShell$.main(GetOffsetShell.scala:98)
at kafka.tools.GetOffsetShell.main(GetOffsetShell.scala)
[2018-08-23 12:36:59,616] WARN Fetching topic metadata with correlation id 0 for topics [Set(namespace_deep_archive_d_billing_transaction)] from broker [BrokerEndPoint(1,101.10.51.4,9092)] failed (kafka.client.ClientUtils$)

我正在从其他服务器运行getOffset。但是,此服务器可以远程登录到 kafka 代理。

如果有人遇到这个问题,你是如何解决的?

我浏览了GetOffsetShell并将问题追溯到文件中缺少/etc/hosts条目。

这是来自 GetOffsetShell.scala 的代码片段

val url = new URI(options.valueOf(urlOpt))
val topic = options.valueOf(topicOpt)
val partition = options.valueOf(partitionOpt).intValue
var time = options.valueOf(timeOpt).longValue
val nOffsets = options.valueOf(nOffsetsOpt).intValue
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 100000)

即使我传递代理的 IP 地址,它们也会解析为相应的主机名。 消费者代码对/etc/hosts进行查找,找不到主机名和 IP 地址之间的映射,引发异常。

在/etc/hosts 中添加服务器名称和 IP 时,代码现在可以从 kafka 代理获得偏移和使用记录。

参考: https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/tools/GetOffsetShell.scala https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/consumer/SimpleConsumer.scala https://github.com/spujadas/elk-docker/issues/54

相关内容

最新更新