使用pyspark将键和值列表映射到键值



我有一个由两列C1和C2组成的数据集。列与多对多的关系相关联。

我想做的是为每个C2找到与C2值最相关的值C1。

例如:

C1  | C2
1  | 2
1  | 5
1  | 9
2  | 9
2  | 8

我们可以在这里看到1与C2的3个值相匹配,而2与2相匹配,所以我想输出:

Out1 |Out2| matches
2  | 1  | 3
5  | 1  | 3
9  | 1  | 3 (1 wins because 3>2)
8  | 2  | 2

到目前为止我所做的是:

dataset = sc.textFile("...").
map(lambda line: (line.split(",")[0],list(line.split(",")[1]) ) ).
reduceByKey(lambda x , y : x+y )   

这样做是为了收集每个C1值的所有C2匹配,这个列表的计数是我们想要的匹配列。我现在想要的是使用这个列表中的每个值作为一个新的键,并有一个映射,如:

(Key ,Value_list[value1,value2,...]) -->(value1 , key ),(value2 , key)...

如何使用spark来完成?任何建议都会很有帮助的。

提前感谢!

数据框架API可能更容易完成这类任务。您可以按C1分组,获得计数,然后按C2分组,并获得匹配次数最多的C1值。

import pyspark.sql.functions as F
df = spark.read.csv('file.csv', header=True, inferSchema=True)
df2 = (df.groupBy('C1')
.count()
.join(df, 'C1')
.groupBy(F.col('C2').alias('Out1'))
.agg(
F.max(
F.struct(F.col('count').alias('matches'), F.col('C1').alias('Out2'))
).alias('c')
)
.select('Out1', 'c.Out2', 'c.matches')
.orderBy('Out1')
)
df2.show()
+----+----+-------+
|Out1|Out2|matches|
+----+----+-------+
|   2|   1|      3|
|   5|   1|      3|
|   8|   2|      2|
|   9|   1|      3|
+----+----+-------+

使用dataframe API可以很容易地得到想要的结果。

from pyspark.sql import *
import pyspark.sql.functions as fun
from pyspark.sql.window import Window
spark = SparkSession.builder.master("local[*]").getOrCreate()
# preparing sample dataframe
data = [(1, 2), (1, 5), (1, 9), (2, 9), (2, 8)]
schema = ["c1", "c2"]
df = spark.createDataFrame(data, schema)

output = df.withColumn("matches", fun.count("c1").over(Window.partitionBy("c1"))) 
.groupby(fun.col('C2').alias('out1')) 
.agg(fun.first(fun.col("c1")).alias("out2"), fun.max("matches").alias("matches"))
output.show()
# output
+----+----+-------+
|Out1|out2|matches|
+----+----+-------+
|   9|   1|      3|
|   5|   1|      3|
|   8|   2|      2|
|   2|   1|      3|
+----+----+-------+

最新更新