PySpark:反序列化eventhub捕获Avro文件中包含的Avro序列化消息



初始情况

AVRO序列化的事件被发送到azure事件中心。这些事件使用azure事件中心捕获功能持久存储。捕获的数据以及事件中心元数据都是用ApacheAvro格式编写的。捕获avro文件中包含的原始事件应使用(py(Spark进行分析。


问题

如何使用(py(Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:阅读器应用程序不知道事件的avro模式,但它作为avro标头包含在消息中(


背景

背景是物联网场景的分析平台。消息由运行在kafka上的物联网平台提供。为了更灵活地处理模式更改,战略决策是坚持使用avro格式。为了启用Azure流分析(ASA(的使用,为每条消息指定avro架构(否则ASA无法反序列化该消息(。

捕获文件avro模式

事件中心捕获功能生成的avro文件的模式如下所示:

{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}

(注意,实际消息以字节形式存储在正文字段中(

示例事件avro模式

为了便于说明,我将具有以下avro模式的事件发送到事件中心:

{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.test.avro",
"fields" : [ 
{"name" : "username","type" : "string"}, 
{"name" : "tweet","type" : "string"},
{"name" : "timestamp","type" : "long"}
],
}

示例事件

{
"username": "stackoverflow",
"tweet": "please help deserialize me",
"timestamp": 1366150681
}

示例avro消息有效载荷

(编码为字符串/注意包括avro模式(

Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}

因此,在最后,该有效载荷将以字节的形式存储在捕获avro文件的"Body"字段中。



我当前的方法

为了便于使用,测试和调试,我目前使用的是pyspark jupyter笔记本电脑。

Spark会话的配置:

%%configure
{
"conf": {
"spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
}
}

将avro文件读取到数据帧中并输出结果:

capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()

结果:

+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset|     EnqueuedTimeUtc|SystemProperties|Properties|                Body|
+--------------+------+--------------------+----------------+----------+--------------------+
|            71|  9936|11/4/2018 4:59:54 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|
|            72| 10448|11/4/2018 5:00:01 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|

获取Body字段的内容并将其强制转换为字符串:

msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])

这就是我让代码工作的程度。花了很多时间试图反序列化实际消息,但没有成功。如果有任何帮助,我将不胜感激!

一些附加信息:Spark正在Microsoft Azure HDInsight 3.6群集上运行。Spark版本是2.2。Python版本是2.7.12。

您要做的是将.decode('utf-8')应用于Body列中的每个元素。你必须从decode创建一个UDF,这样你才能应用它

from pyspark.sql import functions as f
decodeElements = f.udf(lambda a: a.decode('utf-8'))

以下是将IoT Hub存储的avro文件解析到自定义Blob存储端点的完整示例:

storage_account_name = "<YOUR STORACE ACCOUNT NAME>"
storage_account_access_key = "<YOUR STORAGE ACCOUNT KEY>"
# Read all files from one day. All PartitionIds are included. 
file_location = "wasbs://<CONTAINER>@"+storage_account_name+".blob.core.windows.net/<IoT Hub Name>/*/2018/11/30/*/*"
file_type = "avro"
# Read raw data
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
storage_account_access_key)
reader = spark.read.format(file_type).option("inferSchema", "true")
raw = reader.load(file_location)
# Decode Body into strings
from pyspark.sql import functions as f
decodeElements = f.udf(lambda a: a.decode('utf-8'))
jsons = raw.select(
raw['EnqueuedTimeUtc'],
raw['SystemProperties.connectionDeviceId'].alias('DeviceId'), 
decodeElements(raw['Body']).alias("Json")
)
# Parse Json data
from pyspark.sql.functions import from_json
json_schema = spark.read.json(jsons.rdd.map(lambda row: row.Json)).schema
data = jsons.withColumn('Parsed', from_json('Json', json_schema)).drop('Json')

免责声明:我是Python和Databricks的新手,我的解决方案可能不太完美。但我花了一天多的时间来完成这项工作,我希望这能成为一个很好的起点。

我想你也可以做一些类似的事情:

jsonRdd = raw.select(raw.Body.cast("string"))

我遇到了同样的问题。

Spark 2.4版本为我解决了这个问题。

您可以在此处找到文档:https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html

备注:你需要知道你的AVRO文件是什么样子才能创建你的模式(他们只是在这里加载它(。

缺点是:它目前只能在Scala和Java中使用。据我所知,这在Python中还不可能。

最新更新