从Kafka读取数据,并使用Python中的Spark Structured Sreaming打印到控制台



我在Ubuntu 20.04中有kafka_2.13-2.7.0。我运行kafka服务器和zookeeper,然后创建一个主题并通过nc -lk 9999在其中发送一个文本文件。该主题充满了数据。此外,我的系统上有spark-3.0.1-bin-hadoop2.7。事实上,我想使用kafka主题作为python的Spark结构化流的来源。我的代码是这样的:

spark = SparkSession 
.builder 
.appName("APP") 
.getOrCreate()
df = spark 
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", "localhost:9092") 
.option("subscribe", "sparktest") 
.option("startingOffsets", "earliest") 
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
df.printSchema()

我通过spark-submit使用以下命令运行上述代码:

./spark-submit --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.0.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1 /home/spark/PycharmProjects/testSparkStream/KafkaToSpark.py 

代码运行时没有任何异常,我在Spark站点中收到了这个输出:

root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)

我的问题是,卡夫卡主题充满了数据;但是由于在输出中运行代码,因此没有任何数据。你能告诉我这里出了什么问题吗?

代码不会打印出任何数据,只会为您提供一次架构。

您可以按照通用结构化流媒体指南和结构化流媒体+Kafka集成指南中的说明来了解如何将数据打印到控制台。请记住,在Spark中读取数据是一个惰性操作,没有操作(通常是writeStream操作(什么都做不了。

如果您按照以下方式补充代码,您应该会看到所选数据(键和值(打印到控制台:

spark = SparkSession 
.builder 
.appName("APP") 
.getOrCreate()
df = spark
.readStream 
.format("kafka") 
.option("kafka.bootstrap.servers", "localhost:9092") 
.option("subscribe", "sparktest") 
.option("startingOffsets", "earliest") 
.load()

query = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 
.writeStream 
.format("console") 
.option("checkpointLocation", "path/to/HDFS/dir") 
.start()
query.awaitTermination()

相关内容

  • 没有找到相关文章

最新更新