Spark:写入kafka给出错误:org.apache.kafka.common.errors.TimeoutExce



我有一个pyspark流代码,从套接字读取,并写入Kafka主题。
当我写控制台它能够打印出来,但当我写到Kafka主题,它给我org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.
日志是:

2022-12-13 09:18:22,631 - my_module_lib_io.streaming.kafka.spark.producer - INFO - Open Socket on localhost 9999 by using command on another shell: nc -l localhost 9999
22/12/13 09:18:22 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
22/12/13 09:18:25 WARN sources.TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
[Stage 1:>                                                          (0 + 3) / 3]
22/12/13 09:19:34 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, server.com, executor 1): org.apache.kafka.common.errors.TimeoutException: Topic RANDOM_STRING not present in metadata after 60000 ms.

我正在运行这个Apache Kafka (kafka_2.11-2.0.0)localhost: 9092为,未启用SSL。server.properties外观:

broker.id=1
group.initial.rebalance.delay.ms=0
listeners=PLAINTEXT://:9092
log.dirs=/home/aiman/kafka_local/kafka_2.11-2.0.0/kafka-logs
log.retention.check.interval.ms=300000
log.retention.hours=168
log.segment.bytes=1073741824
max.poll.interval.ms=5
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
session.timeout.ms=3
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
transaction.state.log.min.isr=1
transaction.state.log.replication.factor=1
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

主题(RANDOM_STRING)已经存在:

./kafka-topics.sh --zookeeper localhost:2181 --list
TEST_TOPIC
RANDOM_STRING
__consumer_offsets

My Code is:

kafka_config = {
"kafka.bootstrap.servers": "localhost:9092",
"checkpointLocation": "/user/aiman/checkpoint/kafka_local/random_string",
"topic": "RANDOM_STRING"
}
data = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# This Print the dataframe in console:
# data.writeStream 
#     .format("console") 
#     .outputMode("append") 
#     .option("truncate", False) 
#     .start() 
#     .awaitTermination()
data.writeStream 
.format("kafka") 
.outputMode("append") 
.options(**kafka_config) 
.start() 
.awaitTermination()

我使用以下Spark -sql连接器:spark-sql-kafka-0-10_2.11-2.4.0-cdh6.3.4.jar
Spark版本2.4.0-cdh6.3.4.
和Apache Kafka版本是kafka_2.11-2.0.0.

更新:
将apache kafka版本改为kafka_2.11-0.10.0.0.

是否缺少任何配置?

无法复制。RANDOM真的是随机的吗? 如果是这样,我怀疑这就是问题所在。否则,Spark是分布式的,但你指的是Kafka的localhost,所以你有多个Kafka集群,其中一个没有你正在使用的主题吗?

使用最新的Spark(但这可能不是你的问题,尽管,我建议不要使用CDH特定的版本),和Kafka 3.3(再次,版本不应该真的重要)

创建和检查主题

$ kafka-topics --list --bootstrap-server localhost:9092 | grep RANDOM_STRING
RANDOM_STRING

启动服务器

$ nc -lk 9999

运行代码,使用我的Spark版本的包。注意:master是本地的,而不是CDH

中的yarn
spark-submit --master=local --packages 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.3.1' example.py

(在nc终端输入一些东西)

那就把它吃掉。

kcat -C -b localhost:9092 -t 'RANDOM_STRING' -o earliest
hello world
% Reached end of topic RANDOM_STRING [2] at offset 1

注意:Python有TCP套接字服务器和Kafka生产者的本机库,所以如果你的代码只是将请求从一个转发到另一个,Spark是不必要的。

最新更新