使用PySpark,我需要加载"Properties"对象(map的值)从avro文件到它自己的Spark数据框架。这样,"属性";从我的avro文件将变成一个数据框架,其元素和值作为列和行。因此,很难找到一些明确的例子来实现这一点。
文件模式:
root
|-- SequenceNumber: long (nullable = true)
|-- Offset: string (nullable = true)
|-- EnqueuedTimeUtc: string (nullable = true)
|-- SystemProperties: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- member0: long (nullable = true)
| | |-- member1: double (nullable = true)
| | |-- member2: string (nullable = true)
| | |-- member3: binary (nullable = true)
|-- Properties: map (nullable = true)
| |-- key: string
| |-- value: struct (valueContainsNull = true)
| | |-- member0: long (nullable = true)
| | |-- member1: double (nullable = true)
| | |-- member2: string (nullable = true)
| | |-- member3: binary (nullable = true)
|-- Body: binary (nullable = true)
生成的"属性";从上述avro文件加载的数据帧需要如下所示:
member3map_values
是你的朋友。
Collection函数:返回包含映射值的无序数组。
2.3.0新版功能。
df_properties = df.select((F.map_values(F.col('Properties'))[0]).alias('vals')).select('vals.*')
完整的示例:
df = spark.createDataFrame(
[('a', 20, 4.5, 'r', b'8')],
['key', 'member0', 'member1', 'member2', 'member3'])
df = df.select(F.create_map('key', F.struct('member0', 'member1', 'member2', 'member3')).alias('Properties'))
df.printSchema()
# root
# |-- Properties: map (nullable = false)
# | |-- key: string
# | |-- value: struct (valueContainsNull = false)
# | | |-- member0: long (nullable = true)
# | | |-- member1: double (nullable = true)
# | | |-- member2: string (nullable = true)
# | | |-- member3: binary (nullable = true)
df_properties = df.select((F.map_values(F.col('Properties'))[0]).alias('vals')).select('vals.*')
df_properties.show()
# +-------+-------+-------+-------+
# |member0|member1|member2|member3|
# +-------+-------+-------+-------+
# | 20| 4.5| r| [38]|
# +-------+-------+-------+-------+
df_properties.printSchema()
# root
# |-- member0: long (nullable = true)
# |-- member1: double (nullable = true)
# |-- member2: string (nullable = true)
# |-- member3: binary (nullable = true)