初始情况
AVRO序列化的事件被发送到azure事件中心。这些事件使用azure事件中心捕获功能持久存储。捕获的数据以及事件中心元数据都是用ApacheAvro格式编写的。捕获avro文件中包含的原始事件应使用(py(Spark进行分析。
问题
如何使用(py(Spark反序列化包含在AVRO文件的字段/列中的AVRO序列化事件?(注释:阅读器应用程序不知道事件的avro模式,但它作为avro标头包含在消息中(
背景
背景是物联网场景的分析平台。消息由运行在kafka上的物联网平台提供。为了更灵活地处理模式更改,战略决策是坚持使用avro格式。为了启用Azure流分析(ASA(的使用,为每条消息指定avro架构(否则ASA无法反序列化该消息(。
捕获文件avro模式
事件中心捕获功能生成的avro文件的模式如下所示:
{
"type":"record",
"name":"EventData",
"namespace":"Microsoft.ServiceBus.Messaging",
"fields":[
{"name":"SequenceNumber","type":"long"},
{"name":"Offset","type":"string"},
{"name":"EnqueuedTimeUtc","type":"string"},
{"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
{"name":"Body","type":["null","bytes"]}
]
}
(注意,实际消息以字节形式存储在正文字段中(
示例事件avro模式
为了便于说明,我将具有以下avro模式的事件发送到事件中心:
{
"type" : "record",
"name" : "twitter_schema",
"namespace" : "com.test.avro",
"fields" : [
{"name" : "username","type" : "string"},
{"name" : "tweet","type" : "string"},
{"name" : "timestamp","type" : "long"}
],
}
示例事件
{
"username": "stackoverflow",
"tweet": "please help deserialize me",
"timestamp": 1366150681
}
示例avro消息有效载荷
(编码为字符串/注意包括avro模式(
Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}
因此,在最后,该有效载荷将以字节的形式存储在捕获avro文件的"Body"字段中。
。
我当前的方法
为了便于使用,测试和调试,我目前使用的是pyspark jupyter笔记本电脑。
Spark会话的配置:
%%configure
{
"conf": {
"spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
}
}
将avro文件读取到数据帧中并输出结果:
capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()
结果:
+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset| EnqueuedTimeUtc|SystemProperties|Properties| Body|
+--------------+------+--------------------+----------------+----------+--------------------+
| 71| 9936|11/4/2018 4:59:54 PM| Map()| Map()|[4F 62 6A 01 02 1...|
| 72| 10448|11/4/2018 5:00:01 PM| Map()| Map()|[4F 62 6A 01 02 1...|
获取Body字段的内容并将其强制转换为字符串:
msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])
这就是我让代码工作的程度。花了很多时间试图反序列化实际消息,但没有成功。如果有任何帮助,我将不胜感激!
一些附加信息:Spark正在Microsoft Azure HDInsight 3.6群集上运行。Spark版本是2.2。Python版本是2.7.12。
您要做的是将.decode('utf-8')
应用于Body列中的每个元素。你必须从decode创建一个UDF,这样你才能应用它
from pyspark.sql import functions as f
decodeElements = f.udf(lambda a: a.decode('utf-8'))
以下是将IoT Hub存储的avro文件解析到自定义Blob存储端点的完整示例:
storage_account_name = "<YOUR STORACE ACCOUNT NAME>"
storage_account_access_key = "<YOUR STORAGE ACCOUNT KEY>"
# Read all files from one day. All PartitionIds are included.
file_location = "wasbs://<CONTAINER>@"+storage_account_name+".blob.core.windows.net/<IoT Hub Name>/*/2018/11/30/*/*"
file_type = "avro"
# Read raw data
spark.conf.set(
"fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
storage_account_access_key)
reader = spark.read.format(file_type).option("inferSchema", "true")
raw = reader.load(file_location)
# Decode Body into strings
from pyspark.sql import functions as f
decodeElements = f.udf(lambda a: a.decode('utf-8'))
jsons = raw.select(
raw['EnqueuedTimeUtc'],
raw['SystemProperties.connectionDeviceId'].alias('DeviceId'),
decodeElements(raw['Body']).alias("Json")
)
# Parse Json data
from pyspark.sql.functions import from_json
json_schema = spark.read.json(jsons.rdd.map(lambda row: row.Json)).schema
data = jsons.withColumn('Parsed', from_json('Json', json_schema)).drop('Json')
免责声明:我是Python和Databricks的新手,我的解决方案可能不太完美。但我花了一天多的时间来完成这项工作,我希望这能成为一个很好的起点。
我想你也可以做一些类似的事情:
jsonRdd = raw.select(raw.Body.cast("string"))
我遇到了同样的问题。
Spark 2.4版本为我解决了这个问题。
您可以在此处找到文档:https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html
备注:你需要知道你的AVRO文件是什么样子才能创建你的模式(他们只是在这里加载它(。
缺点是:它目前只能在Scala和Java中使用。据我所知,这在Python中还不可能。