我在Spark 2.2.0
和Scala 2.11.8
有以下两个DataFrames
。
df1 =
+----------+-------------------------------+
|item | other_items |
+----------+-------------------------------+
| 111 |[[444,1.0],[333,0.5],[666,0.4]]|
| 222 |[[444,1.0],[333,0.5]] |
| 333 |[] |
| 444 |[[111,2.0],[555,0.5],[777,0.2]]|
+----------+-------------------------------+
printScheme
给出以下输出:
|-- item: string (nullable = true)
|-- other_items: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- item: string (nullable = true)
| | |-- rank: double (nullable = true)
和:
df2 =
+----------+-------------+
|itemA | itemB |
+----------+-------------+
| 111 | 333 |
| 222 | 444 |
| 333 | 555 |
| 444 | 777 |
+----------+-------------+
对于df2
中的每一对,我想从df1
中找到rank
。为此,我应该在df1
中找到相同的对,以便df1.item
等于df2.itemA
并且other_items.struct.[item]
等于df2.itemB
。如果找不到这样的对,则排名应为 0。
结果应该是这个:
+----------+-------------+-------------+
|itemA | itemB | rank |
+----------+-------------+-------------+
| 111 | 333 | 0.5 |
| 222 | 444 | 1.0 |
| 333 | 555 | 0.0 |
| 444 | 777 | 0.2 |
+----------+-------------+-------------+
我该怎么做?
这应该可以做你想要的。诀窍是在连接之前other_items爆炸:
df2.as("df2").join(
df1.select($"item", explode($"other_items").as("other_items")).as("df1"),
$"df2.itemA" === $"df1.item" and $"df2.itemB" === $"df1.other_items.item"
, "left"
)
.select($"itemA", $"itemB", coalesce($"df1.other_items.rank", lit(0.0)).as("rank"))
.show()
您可以通过定义一个udf
函数来实现您的要求,udf
并在将两个dataframe
都join
为
import org.apache.spark.sql.functions._
def findRank = udf((items: mutable.WrappedArray[String], ranks: mutable.WrappedArray[Double], itemB: String) => {
val index = items.indexOf(itemB)
if(index != -1) ranks(index) else 0.0
})
df1.join(df2, df1("item") === df2("itemA"), "right")
.select(df2("itemA"), df2("itemB"), findRank(df1("other_items.item"), df1("other_items.rank"), df2("itemB")).as("rank"))
.show(false)
你应该得到dataframe
+-----+-----+----+
|itemA|itemB|rank|
+-----+-----+----+
|111 |333 |0.5 |
|222 |444 |1.0 |
|333 |555 |0.0 |
|444 |777 |0.2 |
+-----+-----+----+