将一个数据框中的单个值映射到另一个数据框中的值



我有一个包含两列的数据帧 (DF1)

+-------+------+ |词语 |价值 | +-------+------+ |ABC |1.0 | |XYZ |2.0 | |DEF |3.0 | |GHI |4.0 | +-------+------+

和另一个数据帧(DF2),如下所示

+-----------------------------+ |字符串 | +-----------------------------+ |美国广播公司 | |XYZ ABC DEF |                +-----------------------------+

我必须将 DF2 中的各个字符串值替换为 DF1 中的相应值,例如,操作后,我应该取回这个数据帧。

+-----------------------------+ |字符串到双 | +-----------------------------+ |1.0 3.0 4.0 | |2.0 1.0 3.0 |                +-----------------------------+

我已经尝试了多种方法,但我似乎无法找出解决方案。

def createCorpus(conversationCorpus: Dataset[Row], dataDictionary: Dataset[Row]): Unit = {
import spark.implicits._
def getIndex(word: String): Double = {
val idxRow = dataDictionary.selectExpr("index").where('words.like(word))
val idx = idxRow.toString
if (!idx.isEmpty) idx.trim.toDouble else 1.0
}
conversationCorpus.map { //eclipse doesnt like this map here.. throws an error..
r =>
def row = {
val arr = r.getString(0).toLowerCase.split(" ")
val arrList = ArrayBuffer[Double]()
arr.map {
str =>
val index = getIndex(str)
}
Row.fromSeq(arrList.toSeq)
}
row
}
}

合并多个数据帧以创建新列需要联接。通过查看您的两个数据帧,我们似乎可以通过wordsdf1列和stringdf2联接,但string列稍后需要explode和组合(这可以通过在爆炸前为每一行提供唯一 ID 来完成)。monotically_increasing_iddf2的每一行提供唯一的 IDsplit函数将string转换为数组以进行分解。然后你可以join它们。然后,其余步骤是通过执行groupBy聚合分解的行合并回原始行

最后收集的数组列可以使用udf函数更改为所需的字符串列

长话短说,以下解决方案应该适合您

import org.apache.spark.sql.functions._
def arrayToString = udf((array: Seq[Double])=> array.mkString(" "))
df2.withColumn("rowId", monotonically_increasing_id())
.withColumn("string", explode(split(col("string"), " ")))
.join(df1, col("string") === col("words"))
.groupBy("rowId")
.agg(collect_list("value").as("stringToDouble"))
.select(arrayToString(col("stringToDouble")).as("stringToDouble"))

应该给你

+--------------+
|stringToDouble|
+--------------+
|1.0 3.0 4.0   |
|2.0 1.0 3.0   |
+--------------+

最新更新