如何按多列分组并在Pyspark中的列表中收集



这是我的问题:我有这个RDD:

a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd= sc.parallelize (a)

然后我尝试:

rdd.map(lambda x: (x[0],x[1],x[2], list(x[3:])))
.toDF(["col1","col2","col3","col4"])
.groupBy("col1","col2","col3")
.agg(collect_list("col4")).show

最后我应该找到这个:

[col1,col2,col3,col4]=[u'PNR1',u'TKT1',u'TEST',[[u'a2',u'a3'][u'a5',u'a6'][u'a8',u'a9']]]

,但问题是我无法收集列表。

如果有人可以帮助我,我会很感激

我终于找到了一个解决方案,这不是最好的方法,但我可以继续工作...

from pyspark.sql.functions import udf
from pyspark.sql.functions import *
def example(lista):
    d = [[] for x in range(len(lista))]
    for index, elem in enumerate(lista):
      d[index] = elem.split("@")
    return d
example_udf = udf(example, LongType())
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd= sc.parallelize (a)
df = rdd.toDF(["col1","col2","col3","col4","col5"])
df2=df.withColumn('col6', concat(col('col4'),lit('@'),col('col5'))).drop(col("col4")).drop(col("col5")).groupBy([col("col1"),col("col2"),col("col3")]).agg(collect_set(col("col6")).alias("col6"))
df2.map(lambda x: (x[0],x[1],x[2],example(x[3]))).collect()

它给出:

[(u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']])]

希望这种解决方案可以帮助其他人。

感谢您的所有答案。

这可能会做您的工作(或给您一些进一步的想法)...

一个想法是将您的col4转换为原始数据类型,即字符串:

from pyspark.sql.functions import collect_list
import pandas as pd
a = [[u'PNR1',u'TKT1',u'TEST',u'a2',u'a3'],[u'PNR1',u'TKT1',u'TEST',u'a5',u'a6'],[u'PNR1',u'TKT1',u'TEST',u'a8',u'a9']]
rdd = sc.parallelize(a)
df = rdd.map(lambda x: (x[0],x[1],x[2], '(' + ' '.join(str(e) for e in x[3:]) + ')')).toDF(["col1","col2","col3","col4"])
df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
#[u'PNR1', u'TKT1', u'TEST', [u'(a2 a3)', u'(a5 a6)', u'(a8 a9)']]

update (在您自己的答案之后):

我真的认为上面达到的观点足以根据您的需要进一步适应它,此外,我目前没有时间自己做。因此,这里(修改了我的df定义以摆脱括号后,这只是一个列表理解的问题):

df = rdd.map(lambda x: (x[0],x[1],x[2], ' '.join(str(e) for e in x[3:]))).toDF(["col1","col2","col3","col4"])
# temp list:
ff = df.groupBy("col1","col2","col3").agg(collect_list("col4")).toPandas().values.tolist()[0]
ff
# [u'PNR1', u'TKT1', u'TEST', [u'a2 a3', u'a5 a6', u'a8 a9']]
# final list of lists:
ll = ff[:-1] + [[x.split(' ') for x in ff[-1]]]
ll

给出您最初要求的结果:

[u'PNR1', u'TKT1', u'TEST', [[u'a2', u'a3'], [u'a5', u'a6'], [u'a8', u'a9']]]  # requested output

与您自己的答案中提供的方法相比

  • 它避免了pyspark UDF,已知很慢
  • 所有处理都是在最终(希望较小)的汇总数据中完成的,而不是在初始数据(大概更大)数据中添加和删除列和执行MAP函数和UDF

,因为您无法更新为2.x,您的唯一选项是RDD API。用以下方式替换当前代码。

rdd.map(lambda x: ((x[0], x[1], x[2]), list(x[3:]))).groupByKey().toDF()

相关内容

  • 没有找到相关文章

最新更新