我有一个由两列C1和C2组成的数据集。列与多对多的关系相关联。
我想做的是为每个C2找到与C2值最相关的值C1。
例如:
C1 | C2
1 | 2
1 | 5
1 | 9
2 | 9
2 | 8
我们可以在这里看到1与C2的3个值相匹配,而2与2相匹配,所以我想输出:
Out1 |Out2| matches
2 | 1 | 3
5 | 1 | 3
9 | 1 | 3 (1 wins because 3>2)
8 | 2 | 2
到目前为止我所做的是:
dataset = sc.textFile("...").
map(lambda line: (line.split(",")[0],list(line.split(",")[1]) ) ).
reduceByKey(lambda x , y : x+y )
这样做是为了收集每个C1值的所有C2匹配,这个列表的计数是我们想要的匹配列。我现在想要的是使用这个列表中的每个值作为一个新的键,并有一个映射,如:
(Key ,Value_list[value1,value2,...]) -->(value1 , key ),(value2 , key)...
如何使用spark来完成?任何建议都会很有帮助的。
提前感谢!
数据框架API可能更容易完成这类任务。您可以按C1分组,获得计数,然后按C2分组,并获得匹配次数最多的C1值。
import pyspark.sql.functions as F
df = spark.read.csv('file.csv', header=True, inferSchema=True)
df2 = (df.groupBy('C1')
.count()
.join(df, 'C1')
.groupBy(F.col('C2').alias('Out1'))
.agg(
F.max(
F.struct(F.col('count').alias('matches'), F.col('C1').alias('Out2'))
).alias('c')
)
.select('Out1', 'c.Out2', 'c.matches')
.orderBy('Out1')
)
df2.show()
+----+----+-------+
|Out1|Out2|matches|
+----+----+-------+
| 2| 1| 3|
| 5| 1| 3|
| 8| 2| 2|
| 9| 1| 3|
+----+----+-------+
使用dataframe API可以很容易地得到想要的结果。
from pyspark.sql import *
import pyspark.sql.functions as fun
from pyspark.sql.window import Window
spark = SparkSession.builder.master("local[*]").getOrCreate()
# preparing sample dataframe
data = [(1, 2), (1, 5), (1, 9), (2, 9), (2, 8)]
schema = ["c1", "c2"]
df = spark.createDataFrame(data, schema)
output = df.withColumn("matches", fun.count("c1").over(Window.partitionBy("c1")))
.groupby(fun.col('C2').alias('out1'))
.agg(fun.first(fun.col("c1")).alias("out2"), fun.max("matches").alias("matches"))
output.show()
# output
+----+----+-------+
|Out1|out2|matches|
+----+----+-------+
| 9| 1| 3|
| 5| 1| 3|
| 8| 2| 2|
| 2| 1| 3|
+----+----+-------+