我是spark结构化流媒体的新手,正在研究一个需要在结构化流媒体上实现的poc。
输入来源:卡夫卡输入格式:json语言:python3库:spark 3.2
我正在尝试在预定义结构的spark数据帧中格式化传入的json。
到目前为止,我能够获取json事件,并能够在控制台中获得结果(不是预期的格式(。若你们能把我推向正确的方向或提出解决方案,那个将是非常有帮助的。
以下是到目前为止我的代码。
来自kafka 的json
{"property1" : "hello","property2" : "world"}
structured_kafka.py
"""
Run the script
`$ bin/spark-submit structured_kafka.py
host1:port1,host2:port2 subscribe topic1,topic2`
"""
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
if __name__ == "__main__":
if len(sys.argv) != 4:
print("""
Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
sys.exit(-1)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession
.builder
.appName("StructuredKafkaWordCount")
.getOrCreate()
spark.sparkContext.setLogLevel('WARN')
schema = StructType([
StructField("property1", StringType(), True),
StructField("property2" , StringType(), True),
])
lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
df = lines.select('*')
# Start running the query that prints the running counts to the console
query = df
.writeStream
.outputMode('Append')
.format('console')
.start()
query.awaitTermination()
输出
Batch: 1
-------------------------------------------
+--------------------+
| parsed_value|
+--------------------+
|{hello, world} |
+--------------------+
预期
+--------------------+--------------------+
| property1 | property2 |
+--------------------+--------------------+
|hello |world |
+--------------------+---------------------
如果我能得到这种格式的df,我将能够应用我的用例。
请提出建议。
注意:我已经查看了所有现有的解决方案,大多数解决方案要么在scala中,要么不用于结构化流媒体,要么不将kafka作为源。
后一行:
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
添加:
.select(col("parsed_value.property1"), col("parsed_value.property2"))
或:
.select(col("parsed_value.*"))