在我第一次尝试解析json上的kafka上需要一些帮助,以引发结构化流。
我正在努力转换传入的JSON并将其掩盖到平面数据帧中以进行进一步处理。
我的输入json是
[
{ "siteId": "30:47:47:BE:16:8F", "siteData":
[
{ "dataseries": "trend-255", "values":
[
{"ts": 1502715600, "value": 35.74 },
{"ts": 1502715660, "value": 35.65 },
{"ts": 1502715720, "value": 35.58 },
{"ts": 1502715780, "value": 35.55 }
]
},
{ "dataseries": "trend-256", "values":
[
{"ts": 1502715840, "value": 18.45 },
{"ts": 1502715900, "value": 18.35 },
{"ts": 1502715960, "value": 18.32 }
]
}
]
},
{ "siteId": "30:47:47:BE:16:FF", "siteData":
[
{ "dataseries": "trend-255", "values":
[
{"ts": 1502715600, "value": 35.74 },
{"ts": 1502715660, "value": 35.65 },
{"ts": 1502715720, "value": 35.58 },
{"ts": 1502715780, "value": 35.55 }
]
},
{ "dataseries": "trend-256", "values":
[
{"ts": 1502715840, "value": 18.45 },
{"ts": 1502715900, "value": 18.35 },
{"ts": 1502715960, "value": 18.32 }
]
}
]
}
]
火花模式是
data1_spark_schema = ArrayType(
StructType([
StructField("siteId", StringType(), False),
StructField("siteData", ArrayType(StructType([
StructField("dataseries", StringType(), False),
StructField("values", ArrayType(StructType([
StructField("ts", IntegerType(), False),
StructField("value", StringType(), False)
]), False), False)
]), False), False)
]), False
)
我非常简单的代码是:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from config.general import kafka_instance
from config.general import topic
from schemas.schema import data1_spark_schema
spark = SparkSession
.builder
.appName("Structured_BMS_Feed")
.getOrCreate()
stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", kafka_instance)
.option("subscribe", topic)
.option("startingOffsets", "latest")
.option("max.poll.records", 100)
.option("failOnDataLoss", False)
.load()
stream_records = stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as bms_data1")
.select(from_json("bms_data1", data1_spark_schema).alias("bms_data1"))
sites = stream_records.select(explode("bms_data1").alias("site"))
.select("site.*")
sites.printSchema()
stream_debug = sites.writeStream
.outputMode("append")
.format("console")
.option("numRows", 20)
.option("truncate", False)
.start()
stream_debug.awaitTermination()
当我运行此代码时,我的模式是这样打印的:
root
|-- siteId: string (nullable = false)
|-- siteData: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- dataseries: string (nullable = false)
| | |-- values: array (nullable = false)
| | | |-- element: struct (containsNull = false)
| | | | |-- ts: integer (nullable = false)
| | | | |-- value: string (nullable = false)
可以以一种将所有字段在平面数据框架中而不是嵌套的JSON中获取的方式。因此,对于每个TS和值,它都应该给我一行,并带有其父级数据等。
回答我自己的问题。我设法使用以下行将其弄平:
sites_flat = stream_records.select(explode("bms_data1").alias("site"))
.select("site.siteId", explode("site.siteData").alias("siteData"))
.select("siteId", "siteData.dataseries", explode("siteData.values").alias("values"))
.select("siteId", "dataseries", "values.*")