如何在Spark 2中搜索结构?



我在Spark 2.2.0Scala 2.11.8有以下两个DataFrames

df1 =
+----------+-------------------------------+
|item      |        other_items            |
+----------+-------------------------------+
|  111     |[[444,1.0],[333,0.5],[666,0.4]]|
|  222     |[[444,1.0],[333,0.5]]          |
|  333     |[]                             |
|  444     |[[111,2.0],[555,0.5],[777,0.2]]|
+----------+-------------------------------+

printScheme给出以下输出:

|-- item: string (nullable = true)
|-- other_items: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- item: string (nullable = true)
|    |    |-- rank: double (nullable = true)

和:

df2 = 
+----------+-------------+
|itemA     | itemB       |
+----------+-------------+
|  111     | 333         |
|  222     | 444         |
|  333     | 555         |
|  444     | 777         |
+----------+-------------+

对于df2中的每一对,我想从df1中找到rank。为此,我应该在df1中找到相同的对,以便df1.item等于df2.itemA并且other_items.struct.[item]等于df2.itemB。如果找不到这样的对,则排名应为 0。

结果应该是这个:

+----------+-------------+-------------+
|itemA     | itemB       |  rank       |
+----------+-------------+-------------+
|  111     | 333         |   0.5       |
|  222     | 444         |   1.0       |
|  333     | 555         |   0.0       |
|  444     | 777         |   0.2       |
+----------+-------------+-------------+

我该怎么做?

这应该可以做你想要的。诀窍是在连接之前other_items爆炸:

df2.as("df2").join(
df1.select($"item", explode($"other_items").as("other_items")).as("df1"),
$"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item"
, "left"
)
.select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank"))
.show()

您可以通过定义一个udf函数来实现您的要求,udf并在将两个dataframejoin

import org.apache.spark.sql.functions._
def findRank = udf((items: mutable.WrappedArray[String], ranks: mutable.WrappedArray[Double], itemB: String) => {
val index = items.indexOf(itemB)
if(index != -1) ranks(index) else 0.0
})
df1.join(df2, df1("item") === df2("itemA"), "right")
.select(df2("itemA"), df2("itemB"), findRank(df1("other_items.item"), df1("other_items.rank"), df2("itemB")).as("rank"))
.show(false)

你应该得到dataframe

+-----+-----+----+
|itemA|itemB|rank|
+-----+-----+----+
|111  |333  |0.5 |
|222  |444  |1.0 |
|333  |555  |0.0 |
|444  |777  |0.2 |
+-----+-----+----+

相关内容

  • 没有找到相关文章

最新更新