我遇到的数据是这样的:
req_id dsp_price_style
0 "1000:10,1001:100,1002:5,1003:7"
1 "1002:5,1000:100,1001:15,1003:6"
字段"dsp_price_style"的值格式为 dsp_id_0:price_0,dsp_id_1:price_1,.....,dsp_id_n:price_n 虽然它们没有在dsp_id之间排序,但我只需要dsp_id的数据是"1000",dsp_id是"1001"及其价格,并使用 pyspark 将这 4 个数据添加为新列。
req_id dsp_0 price_0 dsp_1 price_1
0 "1000" "10" "1001" "100"
1 "1000" "100" "1001" "15"
如何在 pyspark 中以最佳性能实现此功能?
我这里有一种方法。
data = [
[0,"1000:10,1001:100,1002:5,1003:7"],
[1,"1002:5,1000:100,1001:15,1003:6"]
]
data_sdf = spark.createDataFrame(data, ['req_id', 'dsp_price_style'])
# the function to split
def split_dsp_price(row):
ls_x = row.dsp_price_style.split(sep=',')
return [row.req_id] + reduce(lambda x, y: x + y, [k.split(sep=':') for k in ls_x if k[:4] in ['1000', '1001']])
fnl_data_rdd = data_sdf.rdd.map(lambda r: split_dsp_price(r))
fnl_data_rdd.take(2)
# [[0, '1000', '10', '1001', '100'], [1, '1000', '100', '1001', '15']]
这可以进一步转换为DataFrame
或进一步处理。