仅将 avro 文件中的映射值中的结构加载到 Spark 数据帧中



使用PySpark,我需要加载"Properties"对象(map的值)从avro文件到它自己的Spark数据框架。这样,"属性";从我的avro文件将变成一个数据框架,其元素和值作为列和行。因此,很难找到一些明确的例子来实现这一点。

文件模式:

root
|-- SequenceNumber: long (nullable = true)
|-- Offset: string (nullable = true)
|-- EnqueuedTimeUtc: string (nullable = true)
|-- SystemProperties: map (nullable = true)
|    |-- key: string
|    |-- value: struct (valueContainsNull = true)
|    |    |-- member0: long (nullable = true)
|    |    |-- member1: double (nullable = true)
|    |    |-- member2: string (nullable = true)
|    |    |-- member3: binary (nullable = true)
|-- Properties: map (nullable = true)
|    |-- key: string
|    |-- value: struct (valueContainsNull = true)
|    |    |-- member0: long (nullable = true)
|    |    |-- member1: double (nullable = true)
|    |    |-- member2: string (nullable = true)
|    |    |-- member3: binary (nullable = true)
|-- Body: binary (nullable = true)

生成的"属性";从上述avro文件加载的数据帧需要如下所示:

member3

map_values是你的朋友。

Collection函数:返回包含映射值的无序数组。
2.3.0新版功能。

df_properties = df.select((F.map_values(F.col('Properties'))[0]).alias('vals')).select('vals.*')

完整的示例:

df = spark.createDataFrame(
[('a', 20, 4.5, 'r', b'8')],
['key', 'member0', 'member1', 'member2', 'member3'])
df = df.select(F.create_map('key', F.struct('member0', 'member1', 'member2', 'member3')).alias('Properties'))
df.printSchema()
# root
#  |-- Properties: map (nullable = false)
#  |    |-- key: string
#  |    |-- value: struct (valueContainsNull = false)
#  |    |    |-- member0: long (nullable = true)
#  |    |    |-- member1: double (nullable = true)
#  |    |    |-- member2: string (nullable = true)
#  |    |    |-- member3: binary (nullable = true)
df_properties = df.select((F.map_values(F.col('Properties'))[0]).alias('vals')).select('vals.*')
df_properties.show()
# +-------+-------+-------+-------+
# |member0|member1|member2|member3|
# +-------+-------+-------+-------+
# |     20|    4.5|      r|   [38]|
# +-------+-------+-------+-------+
df_properties.printSchema()
# root
#  |-- member0: long (nullable = true)
#  |-- member1: double (nullable = true)
#  |-- member2: string (nullable = true)
#  |-- member3: binary (nullable = true)

相关内容

  • 没有找到相关文章

最新更新