我有以下数据框:
|-- k1: array (nullable = true)
| |-- element: long (containsNull = true)
|-- k2: string (nullable = true)
|-- k3: array (nullable = true)
| |-- element: long (containsNull = true)
和
|-- k1: long (nullable = true)
|-- k2: string (nullable = true)
|-- k3: long (nullable = true)
请注意,列名和类型是相同的,只是第一个数据框的k1
和k3
是数组。我想在k2
上连接这两个数据框,以便将第二个数据框的条目附加到第一个数据框中的数组中。例如,如果数据框
+---------+---+------------+
| k1 |k2 | k3 |
+---------+---+------------+
|[1, 2, 3]|foo| [4, 5, 6]|
|[7, 8, 9]|bar|[10, 11, 12]|
+---------+---+------------+
和
+-----+---+------+
|k1 |k2 |k3 |
+-----+---+------+
| 4|foo| 7|
| 10|bar| 13|
+-----+---+------+
那么连接的结果应该是
+---+-------------+----------------+
|key| click | search |
+---+-------------+----------------+
|foo|[1, 2, 3, 4] | [4, 5, 6, 7] |
|bar|[7, 8, 9, 10]|[10, 11, 12, 13]|
+---+-------------+----------------+
我的第一个方法是做一个内部连接来得到
+---+---------+------------+-----+------+
|key| click| search|click|search|
+---+---------+------------+-----+------+
|foo|[1, 2, 3]| [4, 5, 6]| 3| 6|
|bar|[7, 8, 9]|[10, 11, 12]| 9| 12|
+---+---------+------------+-----+------+
然后在生成的数据框上执行 foreach 以将所需的行条目附加到数组中,最后删除后两列。但是我无法让打字为foreach工作。
我正在使用带有Spark 1.6.1的Java 8。我是Spark的新手,所以任何指导将不胜感激。
使用java有点棘手。
假设两个数据帧都注册为 t1,则 t2 表使用 udf 合并两个数组。
sql.udf().register("unionArray", (Seq<Long> arr1, Seq<Long> arr2) -> {
List<Long> output =new ArrayList<Long>();
//Convert Seq object to java list and add to output
output.addAll(scala.collection.JavaConversions.asJavaList(arr1));
output.addAll(scala.collection.JavaConversions.asJavaList(arr2));
//Convert java list output scala Seq
return Option.apply(scala.collection.JavaConverters.asScalaIterableConverter(output).asScala().toSeq());
}, DataTypes.createArrayType(DataTypes.LongType));
sql.sql("select t1.k2 as key,unionArray(t1.k1,t2.k1) as click, unionArray(t1.k3,t2.k3) as search from t1 join t2 on t1.k2 = t2.k2").show();
希望这会有所帮助。
这不是 Java 中的解决方案,但也许 Scala 中的以下方法使用UDF
来追加列可以作为参考:
val df1 = Seq(
(Seq(1, 2, 3), "foo", Seq(4, 5, 6)),
(Seq(7, 8, 9), "bar", Seq(10, 11, 12))
).toDF("k1", "k2", "k3")
val df2 = Seq(
(4, "foo", 7),
(10, "bar", 13)
).toDF("k1", "k2", "k3")
def appendCol = udf(
(a: Seq[Int], x: Int) => a :+ x
)
val df3 = df1.join( df2, Seq("k2") ).
withColumn( "click", appendCol(df1("k1"), df2("k1")) ).
withColumn( "search", appendCol(df1("k3"), df2("k3")) )
df3.select( col("k2").as("key"), col("click"), col("search") ).show
+---+-------------+----------------+
|key| click| search|
+---+-------------+----------------+
|foo| [1, 2, 3, 4]| [4, 5, 6, 7]|
|bar|[7, 8, 9, 10]|[10, 11, 12, 13]|
+---+-------------+----------------+