Spark错误:构造远程块读取器时出现I/O错误.java.nio.channels.ClosedByInterrupt



在单元测试中,执行在本地是正常的,但当Spark Streaming执行传播到真正的集群执行器时失败,就像它们无声地崩溃,不再适用于上下文:

stream execution thread for kafkaDataGeneratorInactiveESP_02/Distance [id = 438f45a0-acd6-4729-953f-5a18ae208f1f, runId = a98c6d39-fe14-4ed5-b7fe-7e4009de51b2]] impl.BlockReaderFactory (BlockReaderFactory.java:getRemoteBlockReaderFromTcp(765)) - I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:656)
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533)
at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2940)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:822)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:747)
at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:380)
at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:829)
at java.io.DataInputStream.read(DataInputStream.java:149)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:56)
at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:48)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:153)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$getLatest$2(HDFSMetadataLog.scala:190)
at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:258)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:189)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:300)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:194)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)

我尝试的第一件事是更改查询名称,使用斜线和空格:kafkaDataGeneratorInactiveESP_02/Distance

在中将其重新排列为正确的之后

.queryName("kafkaDataGeneratorInactive" + currentIter.metadata.getString("label"))

100%证明字符串中有/或空格,错误没有消失。

失败的原因实际上是对查询名称和检查点位置路径使用了相同的名称(但不是第一次尝试改进的部分(。稍后我发现了另一个错误日志:

2021-12-01 15:05:46,906 WARN  [main] streaming.StreamingQueryManager (Logging.scala:logWarning(69)) - Stopping existing streaming query [id=b13a69d7-5a2f-461e-91a7-a9138c4aa716, runId=9cb31852-d276-42d8-ade6-9839fa97f85c], as a new run is being started.

为什么停止查询?这是因为在Scala中,我在循环中创建流式查询,迭代集合,同时保持所有查询名称和所有检查点名称相同。在使它们唯一(我只是使用了集合中的字符串值(之后,失败问题就消失了。

相关内容