Kafka java.io.EOFException - NetworkReceive.readFromReadable



我正在尝试从Apache Spark 2.2.1结构化流连接到IBM Message Hub。

连接代码非常基本:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()
import spark.implicits._
val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
option("subscribe", "transactions_load").
option("security.protocol", "SASL_SSL").
option("sasl.mechanism", "PLAIN").
option("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="*****" password="*****";").
option("ssl.protocol", "TLSv1.2").
option("ssl.enabled.protocols", "TLSv1.2").
option("ssl.endpoint.identification.algorithm", "HTTPS").
option("auto.offset.reset","earliest").
option("group.id", System.currentTimeMillis).
load()
val query = df.writeStream.format("console").start()

我正在启动火花壳:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0

但是,我收到一个断开连接的错误:

scala> 18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:34:17 WARN NetworkClient: Bootstrap broker kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
<<repeats forever>>

我用sc.setLogLevel("DEBUG")增加了调试输出,我得到:

<<log output ommitted for brevity>>
18/01/09 08:38:28 DEBUG SessionState: SessionState user: null
18/01/09 08:38:28 DEBUG SessionState: HDFS root scratch dir: /tmp/hive with schema null, permission: rwx-wx-wx
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9_resources
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created local directory: /tmp/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9
18/01/09 08:38:28 INFO SessionState: Created HDFS directory: /tmp/hive/snowch/2b4557ce-dd17-46d1-9ab0-f9a36fd750f9/_tmp_space.db
18/01/09 08:38:28 INFO HiveClientImpl: Warehouse location for Hive client (version 1.2.1) is file:/home/snowch/spark-2.2.1-bin-hadoop2.7/spark-warehouse
18/01/09 08:38:28 DEBUG SessionState: Session is using authorization class class org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider
18/01/09 08:38:28 DEBUG StateStoreCoordinatorRef: Retrieved existing StateStoreCoordinator endpoint
18/01/09 08:38:28 DEBUG StreamExecution: Starting Trigger Calculation
18/01/09 08:38:28 INFO StreamExecution: Starting new streaming query.
18/01/09 08:38:28 DEBUG UserGroupInformation: PrivilegedAction as:snowch (auth:SIMPLE) from:org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
18/01/09 08:38:28 DEBUG KafkaSource$$anon$1: Unable to find batch /tmp/temporary-fb3e6e1a-fbbe-4098-991c-0b29f63ecade/sources/0/0
18/01/09 08:38:28 DEBUG AbstractCoordinator: Sending coordinator request for group spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0 to broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 (id: -5 rack: null)
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -5 at kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG NetworkClient: Initialize connection to node -3 for sending metadata request
18/01/09 08:38:28 DEBUG NetworkClient: Initiating connection to node -3 at kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093.
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--5.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -5
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-sent
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.bytes-received
18/01/09 08:38:28 DEBUG Metrics: Added sensor with name node--3.latency
18/01/09 08:38:28 DEBUG NetworkClient: Completed connection to node -3
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -5
18/01/09 08:38:28 DEBUG Selector: Connection with kafka05-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.153 disconnected
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:323)
at org.apache.kafka.common.network.Selector.poll(Selector.java:283)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:179)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:974)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:174)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1$$anonfun$apply$9.apply(KafkaOffsetReader.scala:172)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply$mcV$sp(KafkaOffsetReader.scala:263)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt$1.apply(KafkaOffsetReader.scala:262)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:85)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.org$apache$spark$sql$kafka010$KafkaOffsetReader$$withRetriesWithoutInterrupt(KafkaOffsetReader.scala:261)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
at org.apache.spark.sql.kafka010.KafkaOffsetReader$$anonfun$fetchLatestOffsets$1.apply(KafkaOffsetReader.scala:172)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.runUninterruptibly(KafkaOffsetReader.scala:230)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.fetchLatestOffsets(KafkaOffsetReader.scala:171)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:132)
at org.apache.spark.sql.kafka010.KafkaSource$$anonfun$initialPartitionOffsets$1.apply(KafkaSource.scala:129)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets$lzycompute(KafkaSource.scala:129)
at org.apache.spark.sql.kafka010.KafkaSource.initialPartitionOffsets(KafkaSource.scala:97)
at org.apache.spark.sql.kafka010.KafkaSource.getOffset(KafkaSource.scala:163)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10$$anonfun$apply$6.apply(StreamExecution.scala:521)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:520)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$10.apply(StreamExecution.scala:518)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$constructNextBatch(StreamExecution.scala:518)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$populateStartOffsets(StreamExecution.scala:492)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:297)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:279)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:294)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:290)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
18/01/09 08:38:28 DEBUG NetworkClient: Node -5 disconnected.
18/01/09 08:38:28 WARN NetworkClient: Bootstrap broker kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093 disconnected
18/01/09 08:38:28 DEBUG ConsumerNetworkClient: Cancelled GROUP_COORDINATOR request ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@4eacac4e, request=RequestSend(header={api_key=10,api_version=0,correlation_id=0,client_id=consumer-1}, body={group_id=spark-kafka-source-9a14bb54-8f1b-47db-8497-19c083128496--998588290-driver-0}), createdTimeMs=1515487108479, sendTimeMs=1515487108598) with correlation id 0 due to node -5 being disconnected
18/01/09 08:38:28 DEBUG NetworkClient: Sending metadata request {topics=[transactions_load]} to node -3
18/01/09 08:38:28 DEBUG Selector: Connection with kafka01-prod02.messagehub.services.eu-gb.bluemix.net/159.8.179.149 disconnected
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
<<repeated>>

我见过以下类似的问题:

  • I/O java.io.EOFException中的Kafka错误:null - 但是,这个问题适用于旧版本的Kafka。

我知道某些输出只是"噪音",但是,我的 Spark 流应用程序似乎没有接收任何数据。 我已连接到控制台客户端,并且能够查看数据。


更新 1- 我尝试配置 JaaS,但仍然收到相同的错误。 问题可能是 JaaS 代码需要在每个工作节点上运行,但未在其上运行。

sc.setLogLevel("DEBUG")
def jaasClientConfig(username: String, password: String): Unit = {
import javax.security.auth.login.AppConfigurationEntry
import javax.security.auth.login.Configuration
import javax.security.auth.login.LoginException
import scala.collection.JavaConversions._
System.setProperty("java.security.auth.login.config", "")
Configuration.setConfiguration(new Configuration() {
def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
val idMap = Map(
"serviceName" -> "kafka",
"username" -> username,
"password" -> password
)
val ace = new AppConfigurationEntry(
"org.apache.kafka.common.security.plain.PlainLoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
idMap
)
return Array(ace)
}
})
}
def startSparkStreaming(): Unit = {
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StreamingRetailTransactions").getOrCreate()
import spark.implicits._
val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
option("subscribe", "transactions_load").
option("security.protocol", "SASL_SSL").
option("sasl.mechanism", "PLAIN").
option("ssl.protocol", "TLSv1.2").
option("ssl.enabled.protocols", "TLSv1.2").
option("ssl.endpoint.identification.algorithm", "HTTPS").
option("auto.offset.reset","earliest").
option("group.id", System.currentTimeMillis).
load()
val query = df.writeStream.format("console").start()
}
jaasClientConfig("****","****")
startSparkStreaming()

更新 2

我也尝试过使用jaas.conf:

~/spark-2.2.1-bin-hadoop2.7$ ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.conf=jaas.conf" --files "jaas.conf"

和。。。

KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="*****"
password="*****";
}

还是同样的问题...

首先,我需要用--conf运行火花外壳,将执行器和驱动程序指向我的jaas.conf:

./bin/spark-shell --master local[1] 
--jars external/kafka-0-10-sql/target/spark-sql-kafka-0-10_2.11-2.2.2-SNAPSHOT.jar,external/kafka-0-10-assembly/target/spark-streaming-kafka-0-10-assembly_2.11-2.2.2-SNAPSHOT.jar 
--conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" 
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/jaas.conf" 
--num-executors 1  --executor-cores 1 

接下来,我必须添加一些 kafka 选项:

val df = spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "kafka03-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka04-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka01-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka02-prod02.messagehub.services.eu-gb.bluemix.net:9093,kafka05-prod02.messagehub.services.eu-gb.bluemix.net:9093").
option("subscribe", "transactions_load").
option("kafka.security.protocol", "SASL_SSL").
option("kafka.sasl.mechanism", "PLAIN").
option("kafka.ssl.protocol", "TLSv1.2").
option("kafka.ssl.enabled.protocols", "TLSv1.2").
load()

请注意,kafka 选项需要以kafka.为前缀,例如:

  • security.protocol=>kafka.security.protocol

这些更改为我解决了连接问题。

这似乎是身份验证失败。在当前的 MH 版本的 Kafka 中,服务器只是在身份验证失败时关闭连接

"sasl.jaas.config" 设置在 0.10.2 版本的 Kafka 客户端中使用

Spark 2.2.1 使用的 Kafka 客户端为 0.10.0,因此身份验证会像怀疑的那样失败。

可以使用"java.security.auth.login.config系统"属性来指定 JAAS 文件

或者,您可以使用如下所示的代码段以编程方式为客户端设置凭据

public static void jaasClientConfig(final String username, final String password) throws Exception {
System.setProperty("java.security.auth.login.config", "");
Configuration.setConfiguration(new Configuration() {
public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
HashMap<String, String> idMap = new HashMap<>();
idMap.put("serviceName", "kafka"); // Seems to be optional
idMap.put("username", username);
idMap.put("password", password);
AppConfigurationEntry ace = new AppConfigurationEntry("org.apache.kafka.common.security.plain.PlainLoginModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, idMap);
AppConfigurationEntry[] entry = { ace };
return entry;
}
});
}

最新更新