我有数据帧,它有两组列 info.name 和info.value:
id |info.name.1|info.name.2|info.name.3|info.value.1|info.value.2|info.value.3|
-------------------------------------------------------------------------------------
1 |amount |currency |action |10 |USD |add |
2 |amount |currency |action |100 |EUR |transfer |
3 |amount |currency |action |2000 |GBP |add |
我的目标是将它们收集到 name:value 对中,并创建包含信息字典的单列:
id |info |
-----------------------------------------------------------|
1 |{amount : 10, currency : USD, action: add} |
2 |{amount : 100, currency : EUR, action: transfer} |
3 |{amount : 2000, currency : GBP, action: add} |
感谢您的建议和帮助。
谢谢。
这是一个可能的解决方案。
让我们创建一些要使用的数据:
data = [
('A', 'B', 10, 100),
('C', 'D', 12, 20),
('A', 'D', 30, 0)
]
schema = T.StructType([
T.StructField('KEY_1', T.StringType()),
T.StructField('KEY_2', T.StringType()),
T.StructField('VAL_1', T.IntegerType()),
T.StructField('VAL_2', T.IntegerType())
])
df = spark.createDataFrame(data, schema)
df.show()
+-----+-----+-----+-----+
|KEY_1|KEY_2|VAL_1|VAL_2|
+-----+-----+-----+-----+
| A| B| 10| 100|
| C| D| 12| 20|
| A| D| 30| 0|
+-----+-----+-----+-----+
以下是描述您打算执行的转换的逻辑:
import pyspark.sql.functions as F
from itertools import groupby
from functools import reduce
from pyspark.sql import DataFrame
fields = [f.name for f in df.schema.fields]
fsort = lambda x: x.split('_')[1]
grouped = groupby(sorted(fields, key=fsort), key=fsort)
dfs = [
df.select(F.create_map(F.col(key), F.col(value)).alias('map_values'))
for group, (key, value) in grouped
]
df = reduce(DataFrame.union, dfs)
df.show()
+----------+
|map_values|
+----------+
| [A -> 10]|
| [C -> 12]|
| [A -> 30]|
|[B -> 100]|
| [D -> 20]|
| [D -> 0]|
+----------+