使用python中的spark结构化流,从来自kafka的json创建一个数据帧



我是spark结构化流媒体的新手,正在研究一个需要在结构化流媒体上实现的poc。

输入来源:卡夫卡输入格式:json语言:python3库:spark 3.2

我正在尝试在预定义结构的spark数据帧中格式化传入的json。

到目前为止,我能够获取json事件,并能够在控制台中获得结果(不是预期的格式(。若你们能把我推向正确的方向或提出解决方案,那个将是非常有帮助的。

以下是到目前为止我的代码。

来自kafka 的json

{"property1" : "hello","property2" : "world"}

structured_kafka.py


"""
Run the script
`$ bin/spark-submit structured_kafka.py 
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType

if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')

schema = StructType([ 
StructField("property1", StringType(), True),
StructField("property2" , StringType(), True),
])

lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

df = lines.select('*')

# Start running the query that prints the running counts to the console
query = df
.writeStream
.outputMode('Append')
.format('console')
.start()
query.awaitTermination()

输出

Batch: 1
-------------------------------------------
+--------------------+
|        parsed_value|
+--------------------+
|{hello, world}      |
+--------------------+

预期

+--------------------+--------------------+
| property1          | property2          |
+--------------------+--------------------+
|hello               |world               |
+--------------------+---------------------

如果我能得到这种格式的df,我将能够应用我的用例。

请提出建议。

注意:我已经查看了所有现有的解决方案,大多数解决方案要么在scala中,要么不用于结构化流媒体,要么不将kafka作为源。

后一行:

.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

添加:

.select(col("parsed_value.property1"), col("parsed_value.property2"))

或:

.select(col("parsed_value.*"))