java.nio.channels.ClosedChannelException 同时消耗来自风暴喷口的消息



我已经编写了风暴拓扑,它使用 kafka 喷口从 kafka 获取数据,它在我的本地环境中运行良好,但在集群中运行良好

我收到以下错误:

2018-05-16 18:25:59.358 o.a.s.k.Zk协调员线程-25-kafkaSpout-executor[20 20] [信息] 任务 [1/1] 刷新分区管理器连接 2018-05-16 18:25:59.359 o.a.s.k.DynamicBrokersReader Thread-25-kafkaSpout-executor[20 20] [INFO] 从 zookeeper 读取分区信息: GlobalPartitionInformation{topic=data-ops, partitionMap={0=uat-datalake-node2.org:6667}} 2018-05-16 18:25:59.359 o.a.s.k.KafkaUtils Thread-25-kafkaSpout-executor[20 20] [INFO] Task [1/1] 已分配 [Partition{host=uat-datalake-node2.org:6667, topic=data-ops, partition=0}] 2018-05-16 18:25:59.360 o.a.s.k.Zk协调员线程-25-kafkaSpout-executor[20 20] [信息] 任务 [1/1] 已删除的分区管理器: [] 2018-05-16 18:25:59.360 o.a.s.k.Zk协调员线程-25-kafkaSpout-executor[20 20] [信息] 任务 [1/1] 新的分区管理器:[] 2018-05-16 18:25:59.360 o.a.s.k.Zk协调员线程-25-kafkaSpout-executor[20 20] [信息] 任务 [1/1] 完成刷新 2018-05-16 18:25:59.361 k.c.SimpleConsumer Thread-25-kafkaSpout-executor[20 20] [INFO]由于错误而重新连接: java.nio.channels.ClosedChannelExceptionat kafka.network.BlockingChannel.send(BlockingChannel.scala:110) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:85) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) [kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) [kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) [kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) [kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 2018-05-16 18:26:09.372 o.a.s.k.KafkaUtils Thread-25-kafkaSpout-executor[2020] [WARN] 获取消息时出现网络错误: java.net.SocketTimeoutExceptionat sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_144] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_144] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_144] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.2.1.jar:?] at kafka.network.BlockingChannel.readComplete(BlockingChannel.scala:129) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?]at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 2018-05-16 18:26:09.373 o.a.s.k.KafkaSpout Thread-25-kafkaSpout-executor[20 20] [WARN] 获取失败 org.apache.storm.kafka.FailedFetchException: java.net.SocketTimeoutException at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:199) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.fill(PartitionManager.java:189) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.PartitionManager.next(PartitionManager.java:138) ~[storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) [storm-kafka-1.0.1.jar:1.0.1] at org.apache.storm.daemon.executor$fn__6505$fn__6520$fn__6551.invoke(executor.clj:651) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at org.apache.storm.util$async_loop$fn__554.invoke(util.clj:484) [storm-core-1.0.1.2.5.3.0-37.jar:1.0.1.2.5.3.0-37] at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144] 原因:java.net.SocketTimeoutException at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:211) ~[?:1.8.0_144] at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103) ~[?:1.8.0_144] at java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:385) ~[?:1.8.0_144] at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:81) ~[kafka-clients-0.10.2.1.jar:?] at kafka.network.BlockingChannel.readComplete(BlockingChannel.scala:129) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.network.BlockingChannel.receive(BlockingChannel.scala:120) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:99) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:83) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:132) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:131) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:130) ~[kafka_2.10-0.10.2.1.jar:?] at kafka.javaapi.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:47) ~[kafka_2.10-0.10.2.1.jar:?] at org.apache.storm.kafka.KafkaUtils.fetchMessages(KafkaUtils.java:191) ~[storm-kafka-1.0.1.jar:1.0.1] ...7 更多

看起来当 Storm 工作线程尝试从 Kafka 代理读取时,您遇到了超时。也许它们之间的联系是片状的或缓慢的?

也就是说,堆栈跟踪似乎表明消费者已经重新连接,所以如果这种情况很少发生,你可能只是在工作线程和 Kafka 之间的连接中打了个嗝。

如果这种情况经常发生并且您确定连接稳定,我会尝试在 https://kafka.apache.org/contact 的 Kafka 邮件列表中询问。如果您发布问题以及您正在使用的 Kafka 版本,他们可能会告诉您是否存在可能导致使用者套接字超时的问题。

最新更新