使用spark流分析事件中心消息上的JSON



我正在尝试解析通过EventHub流式传输的JSON文件,我正在将消息正文转换为string,然后使用from_json,如下所示。我可以将整个JSON对象保存为delta表中的一个单元格(当我在下面的代码中从df4中写入流时会发生这种情况(,但是当我使用body.*col(body.*)json拆分为多列时,我会出现错误。关于如何处理这个问题的任何建议。

// Scala Code //
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
incomingStream.printSchema()
val outputStream = incomingStream.select($"body".cast(StringType)).alias("body")

val df = outputStream.toDF()
val df4=df.select(from_json(col("body"),jsonSchema))
val df5=df4.select("body.*")
df5.writeStream
.format("delta")
.outputMode("append")
.option("ignoreChanges", "true")
.option("checkpointLocation", "/mnt/abc/checkpoints/samplestream")
.start("/mnt/abc/samplestream")

输出

root
|-- body: binary (nullable = true)
|-- partition: string (nullable = true)
|-- offset: string (nullable = true)
|-- sequenceNumber: long (nullable = true)
|-- enqueuedTime: timestamp (nullable = true)
|-- publisher: string (nullable = true)
|-- partitionKey: string (nullable = true)
|-- properties: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
|-- systemProperties: map (nullable = true)
|    |-- key: string
|    |-- value: string (valueContainsNull = true)
root
|-- body: string (nullable = true)
AnalysisException: cannot resolve 'body.*' given input columns 'body'
at org.apache.spark.sql.catalyst.analysis.UnresolvedStarBase.expand(unresolved.scala:416)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.$anonfun$expand$1(Analyzer.scala:2507)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:53)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveReferences$$expand(Analyzer.scala:2506)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.$anonfun$buildExpandedProjectList$1(Analyzer.scala:2526)
at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.buildExpandedProjectList(Analyzer.scala:2524)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$18.applyOrElse(Analyzer.scala:2238)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$18.applyOrElse(Analyzer.scala:2233)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:137)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:86)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:137)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:340)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:133)

下面的链接显示了在控制台上显示的方式,对我来说很有效,我正试图将json写入一个具有多列的delta文件。

[https://stackoverflow.com/questions/57298849/parsing-event-hub-messages-using-spark-streaming]

代码的问题似乎是如何使用alias,因此列body不再可用。一些关于你在代码中使用别名的观察结果,以及你可能试图解决这个问题的方法:

观察1

问题:

val outputStream = incomingStream.select($"body".cast(StringType)).alias("body")

上面的代码是整个数据帧的别名。如果您的意图是确保body列在字符串转换后别名为body,您可以尝试以下

建议:

val outputStream = incomingStream.select($"body".cast(StringType).alias("body"))

观察2

在哪里

问题:

val df4=df.select(from_json(col("body"),jsonSchema))

您应该使用一个别名,以便以后可以访问它,因为它现在被另一个名称引用(您可以在调试时使用printSchemashow自己查看(。

建议:

val df4=df.select(from_json(col("body"),jsonSchema).alias("body"))

最新更新