联接两个数据帧,以便将其中一个数据帧的条目追加到另一个数据帧的数组中



我有以下数据框:

|-- k1: array (nullable = true)
|    |-- element: long (containsNull = true)
|-- k2: string (nullable = true)
|-- k3: array (nullable = true)
|    |-- element: long (containsNull = true)

|-- k1: long (nullable = true)
|-- k2: string (nullable = true)
|-- k3: long (nullable = true)

请注意,列名和类型是相同的,只是第一个数据框的k1k3是数组。我想在k2上连接这两个数据框,以便将第二个数据框的条目附加到第一个数据框中的数组中。例如,如果数据框

+---------+---+------------+
|    k1   |k2 |      k3    |
+---------+---+------------+
|[1, 2, 3]|foo|   [4, 5, 6]|
|[7, 8, 9]|bar|[10, 11, 12]|
+---------+---+------------+

+-----+---+------+
|k1   |k2 |k3    |
+-----+---+------+
|    4|foo|     7|
|   10|bar|    13|
+-----+---+------+

那么连接的结果应该是

+---+-------------+----------------+
|key|    click    |      search    |
+---+-------------+----------------+
|foo|[1, 2, 3, 4] |   [4, 5, 6, 7] | 
|bar|[7, 8, 9, 10]|[10, 11, 12, 13]| 
+---+-------------+----------------+

我的第一个方法是做一个内部连接来得到

+---+---------+------------+-----+------+
|key|    click|      search|click|search|
+---+---------+------------+-----+------+
|foo|[1, 2, 3]|   [4, 5, 6]|    3|     6|
|bar|[7, 8, 9]|[10, 11, 12]|    9|    12|
+---+---------+------------+-----+------+

然后在生成的数据框上执行 foreach 以将所需的行条目附加到数组中,最后删除后两列。但是我无法让打字为foreach工作。

我正在使用带有Spark 1.6.1的Java 8。我是Spark的新手,所以任何指导将不胜感激。

使用java有点棘手。

假设两个数据帧都注册为 t1,则 t2 表使用 udf 合并两个数组。

sql.udf().register("unionArray", (Seq<Long> arr1, Seq<Long> arr2) -> {
List<Long> output =new ArrayList<Long>();
//Convert Seq object to java list and add to output
output.addAll(scala.collection.JavaConversions.asJavaList(arr1));
output.addAll(scala.collection.JavaConversions.asJavaList(arr2));
//Convert java list output scala Seq
return Option.apply(scala.collection.JavaConverters.asScalaIterableConverter(output).asScala().toSeq());
}, DataTypes.createArrayType(DataTypes.LongType));
sql.sql("select t1.k2 as key,unionArray(t1.k1,t2.k1) as click, unionArray(t1.k3,t2.k3) as search from t1 join t2 on t1.k2 = t2.k2").show();

希望这会有所帮助。

这不是 Java 中的解决方案,但也许 Scala 中的以下方法使用UDF来追加列可以作为参考:

val df1 = Seq(
(Seq(1, 2, 3), "foo", Seq(4, 5, 6)),
(Seq(7, 8, 9), "bar", Seq(10, 11, 12))
).toDF("k1", "k2", "k3")
val df2 = Seq(
(4, "foo", 7),
(10, "bar", 13)
).toDF("k1", "k2", "k3")
def appendCol = udf(
(a: Seq[Int], x: Int) => a :+ x
)
val df3 = df1.join( df2, Seq("k2") ).
withColumn( "click", appendCol(df1("k1"), df2("k1")) ).
withColumn( "search", appendCol(df1("k3"), df2("k3")) )
df3.select( col("k2").as("key"), col("click"), col("search") ).show
+---+-------------+----------------+
|key|        click|          search|
+---+-------------+----------------+
|foo| [1, 2, 3, 4]|    [4, 5, 6, 7]|
|bar|[7, 8, 9, 10]|[10, 11, 12, 13]|
+---+-------------+----------------+

相关内容

  • 没有找到相关文章

最新更新