这是此处发布的问题的修改版本-value-is-in-in-in-inthoth
我正在使用pyspark(Spark 1.6)
我有以下数据:
myDict
{'1': 'c1', '3': 'c3', '2': 'c2', '5': 'c5', '4': 'c4', '6': 'c6'}
我有foll df:
+----+----+---------+---+---+---+---+---+---+
|user|item|fav_items| c1| c2| c3| c4| c5| c6|
+----+----+---------+---+---+---+---+---+---+
| u1| 1| 1,1,3| 0| 0| 0| 0| 0| 0|
| u1| 4|4,4,4,5,6| 0| 0| 0| 0| 0| 0|
+----+----+---------+---+---+---+---+---+---+
输出应为:
+----+----+---------+---+---+---+---+---+---+
|user|item|fav_items| c1| c2| c3| c4| c5| c6|
+----+----+---------+---+---+---+---+---+---+
| u1| 1| 1,1,3| 2| 0| 1| 0| 0| 0|
| u1| 4|4,4,4,5,6| 0| 0| 0| 3| 1| 1|
+----+----+---------+---+---+---+---+---+---+
根据fav_items中的cound和值,查找mydict以获取列映射并更新列。例如,对于第一行,我们有1次发生两次,并从mydict中获得1次地图,因此第1行的C1值应为2。
我得到了以下方法,我们可以在列上迭代列,但是这种方法效率低下,因为列数超过2k 。
for key in myDict.keys():
contains_event = udf(lambda x: x.count(key), IntegerType())
df = df.withColumn(myDict[key], contains_event('fav_items'))
我正在为此问题寻找一种更有效的方法。
预先感谢。
只是以我的方式尝试,希望它会有所帮助。
>>> from pyspark.sql.types import *
>>> from pyspark.sql imoport functions as F
>>> from collections import Counter
>>> d = {'1': 'c1', '3': 'c3', '2': 'c2', '5': 'c5', '4': 'c4', '6': 'c6'}
>>> df = spark.createDataFrame([('u1',1,'1,1,3',0,0,0,0,0,0),('u1',4,'4,4,4,5,6',0,0,0,0,0,0),('u1',1,'3,6,2',0,0,0,0,0,0)],['user','item','fav_items','c1','c2','c3','c4','c5','c6'])
>>> df.show()
+----+----+---------+---+---+---+---+---+---+
|user|item|fav_items| c1| c2| c3| c4| c5| c6|
+----+----+---------+---+---+---+---+---+---+
| u1| 1| 1,1,3| 0| 0| 0| 0| 0| 0|
| u1| 4|4,4,4,5,6| 0| 0| 0| 0| 0| 0|
| u1| 1| 3,6,2| 0| 0| 0| 0| 0| 0|
+----+----+---------+---+---+---+---+---+---+
>>> udf1 = F.udf(lambda c: Counter(c).most_common(),ArrayType(ArrayType(StringType())))
>>> df1 = df.select('user','item','fav_items',udf1(F.split(df.fav_items,',')).alias('item_counter'))
>>> df1.show(3,False)
+----+----+---------+------------------------------------------------------------+
|user|item|fav_items|item_counter |
+----+----+---------+------------------------------------------------------------+
|u1 |1 |1,1,3 |[WrappedArray(1, 2), WrappedArray(3, 1)] |
|u1 |4 |4,4,4,5,6|[WrappedArray(4, 3), WrappedArray(5, 1), WrappedArray(6, 1)]|
|u1 |1 |3,6,2 |[WrappedArray(3, 1), WrappedArray(6, 1), WrappedArray(2, 1)]|
+----+----+---------+------------------------------------------------------------+
>>> df2 = df2.select('user','item','fav_items','val',df2.val[0].alias('val1'),df2.val[1].alias('val2'))
>>> df2.show()
+----+----+---------+------+----+----+
|user|item|fav_items| val|val1|val2|
+----+----+---------+------+----+----+
| u1| 1| 1,1,3|[1, 2]| 1| 2|
| u1| 1| 1,1,3|[3, 1]| 3| 1|
| u1| 4|4,4,4,5,6|[4, 3]| 4| 3|
| u1| 4|4,4,4,5,6|[5, 1]| 5| 1|
| u1| 4|4,4,4,5,6|[6, 1]| 6| 1|
| u1| 1| 3,6,2|[3, 1]| 3| 1|
| u1| 1| 3,6,2|[6, 1]| 6| 1|
| u1| 1| 3,6,2|[2, 1]| 2| 1|
+----+----+---------+------+----+----+
>>> udf2 = F.udf(lambda x : d[x],StringType())
>>> df2 = df2.withColumn('d_col',udf2(df2.val1))
>>> df2.show()
+----+----+---------+------+----+----+-----+
|user|item|fav_items| val|val1|val2|d_col|
+----+----+---------+------+----+----+-----+
| u1| 1| 1,1,3|[1, 2]| 1| 2| c1|
| u1| 1| 1,1,3|[3, 1]| 3| 1| c3|
| u1| 4|4,4,4,5,6|[4, 3]| 4| 3| c4|
| u1| 4|4,4,4,5,6|[5, 1]| 5| 1| c5|
| u1| 4|4,4,4,5,6|[6, 1]| 6| 1| c6|
| u1| 1| 3,6,2|[3, 1]| 3| 1| c3|
| u1| 1| 3,6,2|[6, 1]| 6| 1| c6|
| u1| 1| 3,6,2|[2, 1]| 2| 1| c2|
+----+----+---------+------+----+----+-----+
>>> pvtdf = df2.groupby(['user','item','fav_items']).pivot('d_col').agg(F.first('val2')).na.fill({'c1':0,'c2':0,'c3':0,'c4':0,'c5':0,'c6':0})
>>> pvtdf.show()
+----+----+---------+---+---+---+---+---+---+
|user|item|fav_items| c1| c2| c3| c4| c5| c6|
+----+----+---------+---+---+---+---+---+---+
| u1| 1| 1,1,3| 2| 0| 1| 0| 0| 0|
| u1| 1| 3,6,2| 0| 1| 1| 0| 0| 1|
| u1| 4|4,4,4,5,6| 0| 0| 0| 3| 1| 1|
+----+----+---------+---+---+---+---+---+---+