如何使用SPARK DataFrame API按id分组,计算组中的所有值组合,并生成单个输出数据帧?
示例:
val testSchema = StructType(Array(
StructField("id", IntegerType),
StructField("value", StringType)))
val test_rows = Seq(
Row(1, "a"),
Row(1, "b"),
Row(1, "c"),
Row(2, "a"),
Row(2, "d"),
Row(2, "e")
)
val test_rdd = sc.parallelize(test_rows)
val test_df = sqlContext.createDataFrame(test_rdd, testSchema)
预期输出:
1 a b
1 a c
1 b c
2 a d
2 a e
2 d e
迄今为止的最佳解决方案:
执行自联接,过滤id相等并消除相等值
val result = test_df.join(
test_df.select(test_df.col("id").as("r_id"), test_df.col("value").as("r_value")),
($"id" === $"r_id") and ($"value" !== $"r_value")).select("id", "value", "r_value")
+---+-----+-------+
| id|value|r_value|
+---+-----+-------+
| 1| a| b|
| 1| a| c|
| 1| b| a|
| 1| b| c|
| 1| c| a|
| 1| c| b|
| 2| a| d|
| 2| a| e|
| 2| d| a|
| 2| d| e|
| 2| e| a|
| 2| e| d|
+---+-----+-------+
剩余问题:在执行联接时,如何消除重复集,例如(a,b)和(b,a)?
您对值字段中的对象有排序吗?如果是这样的话,您似乎可以将数据帧与其本身连接起来,同时要求id相同,并且左表的值小于右表的值。
[edit]如果您没有排序,并且每个id的值足够少,那么另一种解决方案是使用groupByKey
,然后从生成的序列中创建所有组合,这可以比创建所有对然后只保留一半更简单。(例如,如果您使用Scala,我相信Seq
的combination
函数[doc]会满足您的需要。)对于大多数数据集,这将比自联接方法执行得更差。