Spark结构化流无法从docker中的kafka读取数据



提交spark结构化流作业从kafka读取后无法修复问题。spark job的代码示例:

object KafkaStructuredStreaming {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName(getClass.getName)
.master("spark://spark-master:7077")
.getOrCreate()
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", "earliest")
.option("subscribe", "tweet-upload-6")
.option("enable.auto.commit", false)
.option("group.id", "Structured-Streaming-Examples")
.option("failOnDataLoss", false)
.load()
df.printSchema()
val consoleOutput = df.writeStream
.outputMode("append")
.format("console")
.start()
consoleOutput.awaitTermination()
}
}

注意:Kafka和spark节点在同一个docker网络中,之前所有的工作都是spark-streaming,但是我切换到结构化流,因为我有单输入流的问题->多个输出流。

我现在得到错误:

07721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job  | java.net.UnknownHostException: kafka
submit-spark-job  |     at java.net.InetAddress.getAllByName0(InetAddress.java:1282)
submit-spark-job  |     at java.net.InetAddress.getAllByName(InetAddress.java:1194)
submit-spark-job  |     at java.net.InetAddress.getAllByName(InetAddress.java:1128)
submit-spark-job  |     at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
submit-spark-job  |     at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:111)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:512)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:466)
submit-spark-job  |     at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:172)
submit-spark-job  |     at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:985)
submit-spark-job  |     at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:311)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.trySend(ConsumerNetworkClient.java:498)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:255)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.pollNoWakeup(ConsumerNetworkClient.java:306)
submit-spark-job  |     at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread.run(AbstractCoordinator.java:1367)
submit-spark-job  | 21/10/18 22:44:41 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0-1, groupId=spark-kafka-source-4d4a80ae-01a7-4679-8393-42aaa0e35e6a-307721793-driver-0] Error connecting to node kafka:9092 (id: 1001 rack: null)
submit-spark-job  | java.net.UnknownHostException: kafka

85949862d6d3-1705030665-driver-0] Group coordinator kafka:9092 (id: 2147482646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted.
submit-spark-job  | 21/10/18 23:03:46 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:47 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:48 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0-1, groupId=spark-kafka-source-aafbb352-744d-438a-bd45-85949862d6d3-1705030665-driver-0] Connection to node 1001 (kafka/172.18.0.4:9092) could not be established. Broker may not be available.
submit-spark-job  | 21/10/18 23:03:48 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0) (172.18.0.7 executor 0): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665)
submit-spark-job  |     at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:124)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:61)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
submit-spark-job  |     at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
submit-spark-job  |     at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactor

Docker compose files:

version: '2'
services:
zookeeper:
image: zookeeper
ports:
- "2181:2181"
kafka:
image: linuxkitpoc/kafka:latest
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_PORT: "9092"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
tweet-producer:
image: mkovacevic/tweet-producer-app:latest
ports:
- "8080:8080"
tty: true
depends_on:
- kafka

(创建于默认网络:twitter-streaming_default)和

version: '3'
services:
spark-master:
image: bde2020/spark-master:3.1.1-hadoop3.2
container_name: spark-master
hostname:  spark-master
ports:
- "7077:7077"
environment:
- INIT_DAEMON_STEP=setup_spark
spark-worker-1:
image: bde2020/spark-worker:3.1.1-hadoop3.2
container_name: spark-worker-1
depends_on:
- spark-master
ports:
- "8081:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
spark-worker-2:
image: bde2020/spark-worker:3.1.1-hadoop3.2
container_name: spark-worker-2
depends_on:
- spark-master
ports:
- "8082:8081"
environment:
- "SPARK_MASTER=spark://spark-master:7077"
submit-spark:
image: mkovacevic/spark-streaming-app:latest
container_name: submit-spark-job
depends_on:
- spark-master
- spark-worker-1
- spark-worker-2
networks:
default:
external: true
name: twitter-streaming_default

有什么建议吗? ?

每个组合文件创建自己的、隔离的默认桥接网络。

如果你想使用两个文件,那么你必须显式地为每个服务添加网络。否则,您需要将所有服务放在一个文件

中。顺便说一下,linuxkitpoc/kafka图像已经很多年没有更新了。我建议你给卡夫卡用点别的。个人推荐来自Bitnami