将字符串与其他列pyspark进行分组连接



数据框架如下

+----------+--------------------+
|CustomerNo| desc               |
+----------+--------------------+
|  351856.0| FORM & BEAUTY 075 P|
|  351856.0| FORM & BEAUTY 075 P|
|  326022.0|            D 151 HP|
|   69430.0|Shape Sensation 0...|
|   38018.0|   Maximizer 846 WHU|
|   69712.0|Shape Sensation 0...|
|   71228.0|   Aqua Festive WHUD|
|   71228.0|Maximizer 736 WHU...|
|   73200.0|  T-Shirt Bra 081 HP|
|   73200.0|  T-Shirt Bra 081 HP|
|   73200.0|  T-Shirt Bra 081 HP|
|   74540.0|Form & Beauty 052 HP|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
|   74578.0|          G 56 WP 01|
+----------+--------------------+

我需要按CustomerNo上的数据分组并连接字符串列。

我正在使用下面的代码,但它给出错误

df = retail_df.groupBy('CustomerNo').agg(F.concat('desc').alias('concat_Desc'))
谁能告诉我怎么做这个?

您可以按CustomerNo上的数据框架分组,然后执行收集列表。接下来,您可以使用concat_ws

将单个列的列表项连接起来见下面的代码,

retail_df 
.groupBy('CustomerNo') 
.agg(F.collect_list('desc').alias('items')) 
.withColumn("concat_Desc", F.concat_ws(",", "items"))

此解决方案不使用udf,因此在性能方面会更好。

这不是很清楚你想要的输出是什么,但如果我理解正确的话,你会喜欢像下面这样的解决方案,它使用collect_list将数组中的所有项目分组,并使用udf将这样的数组元素加入到字符串中:

import pyspark.sql.functions as F
@F.udf('string')
def concat_into_string(l):
return ' - '.join(l)
df = retail_df 
.groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) 
.withColumn('final_string', concat_into_string('desc'))

例子
df = spark.createDataFrame([
(1, 'A'),
(1, 'A'),
(2, 'B'),
(3, 'C1'),
(3, 'C2'),
(4, 'D'),
(4, 'D'),
(4, 'D'),
(4, 'D')
], ('CustomerNo', 'desc'))
df 
.groupBy('CustomerNo').agg(F.collect_list('desc').alias('desc')) 
.withColumn('final_string', concat_into_string('desc')) 
.show()
+----------+------------+-------------+
|CustomerNo|        desc| final_string|
+----------+------------+-------------+
|         1|      [A, A]|        A - A|
|         2|         [B]|            B|
|         3|    [C1, C2]|      C1 - C2|
|         4|[D, D, D, D]|D - D - D - D|
+----------+------------+-------------+
import pyspark.sql.functions as F
df = spark.read.option("inferschema","true").option("header","true").csv("/FileStore/tables/test.csv")
print("Sample Data")
df.select("eid","ename").show()
print("")
print("")
print("Final Ouput")
df.select("eid","ename").groupBy("eid").agg(F.concat_ws(", ", F.collect_list("ename")).alias("desc")).show()
Sample Data
+---+-----+
|eid|ename|
+---+-----+
|  1|    a|
|  1|    b|
|  2|    c|
|  2|    d|
|  3|    e|
|  4|    f|
|  4|    g|
+---+-----+

Final Ouput
+---+----+
|eid|desc|
+---+----+
|  1|a, b|
|  3|   e|
|  4|f, g|
|  2|c, d|
+---+----+

最新更新