我正在尝试编写一个函数,该函数将转换 df -> df2,如下所示:
// input dataframe df
+-----+-----+
| T | S |
+-----+-----+
| A| 4|
| B| 8|
| C| 8|
| D| 2|
+-----+-----+
我需要一个将df
作为输入并返回df2
作为输出的函数。
// output dataframe df2
+-----+-----+-----+
| T1 | T2 | S=T1+T2 |
+-----+-----+-----+
| A| B| 12|
| A| C| 12|
| A| D| 6|
| B| C| 16|
| B| D| 10|
| C| D| 10|
+-----+-----+-----+
编辑我想出了这个解决方案。任何改进都将受到欢迎。
val sumOf = udf((left_score: Float, right_score: Float) => left_score + right_score)
val left = df.select("T", "S").withColumnRenamed("T", "T1").withColumnRenamed("S", "S1")
val right= df.select("T", "S").withColumnRenamed("T", "T2").withColumnRenamed("S", "S2")
val joinDF = left.join(right, left.col("T1") !== right.col("T2"))
val outDF = joinDF.withColumn("S", sumOf($"S1", $"S2")).select("T1", "T2", "S")
val df = sc.parallelize(Seq("A" -> 4, "B" -> 8, "C" -> 8, "D" -> 2))
.toDF("T", "S")
val df1 = df.withColumnRenamed("T", "T1")
.withColumnRenamed("S", "S1")
val df2 = df.withColumnRenamed("T", "T2")
.withColumnRenamed("S", "S2")
df1.join(df2, df1("T1") < df2("T2"))
.withColumn("S", 'S1 + 'S2)
.drop("S1", "S2")
.show
+---+---+---+
| T1| T2| S|
+---+---+---+
| A| B| 12|
| A| C| 12|
| A| D| 6|
| B| C| 16|
| B| D| 10|
| C| D| 10|
+---+---+---+
基本上你不想要一个完整的笛卡尔乘积。只有 T2> T1 的所有可能性。这就是连接条件在代码中的含义。请注意,笛卡尔积生成 n² 条记录。在这里,您将生成 n(n-1(/2 条记录。这小于 n²,但仍在 O(n²( 中,因此应尽可能避免...
撇开性能不谈(提示:不可能让 Spark 在大型笛卡尔产品上表现良好(,您可以使用 Spark 2.x 中引入的交叉连接
import sc.implicits._
val df = sc.parallelize(Seq("A" -> 4, "B" -> 8, "C" -> 8, "D" -> 2))
.toDF("T", "S")
df.as("df1")
.crossJoin(df.as("df2"))
.filter($"df1.T" =!= $"df2.T")
.select($"df1.T".as("T1"), $"df2.T".as("T2"))
.withColumn("S", $"df1.S"+$"df2.S") // you can use udf here as well
使用内部连接可以实现相同的结果,这使其与Spark 1.6.x兼容
import sc.implicits._
val df = sc.parallelize(Seq("A" -> 4, "B" -> 8, "C" -> 8, "D" -> 2))
.toDF("T", "S")
df.as("df1")
.join(df.as("df2"), Seq("T"), "inner") // this line is different
.filter($"df1.T" =!= $"df2.T")
.select($"df1.T".as("T1"), $"df2.T".as("T2"))
.withColumn("S", $"df1.S"+$"df2.S") // you can use udf here as well
我建议的解决方案根本不需要您使用join
。但是该解决方案也很昂贵,因为所有数据都将累积到一个执行器进行处理。
我的解决方案是将内置功能(例如array
,collect_list
和explode
以及window
功能组合在一起,如下所示
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
def windowFunction = Window.orderBy("T").rowsBetween(1, Long.MaxValue)
df.withColumn("array", collect_list(array($"T", $"S")).over(windowFunction))
.withColumn("array", explode($"array"))
.select($"T".as("T1"), $"array"(0).as("T2"), ($"array"(1)+$"S").as("S=T1+T2"))
.show(false)
这应该给你你想要的输出,作为
+---+---+-------+
|T1 |T2 |S=T1+T2|
+---+---+-------+
|A |B |12.0 |
|A |C |12.0 |
|A |D |6.0 |
|B |C |16.0 |
|B |D |10.0 |
|C |D |10.0 |
+---+---+-------+