用列上的子字符串条件连接数据帧



我正在用Scala编写函数来获取用于训练ML模型的数据。我有一个数据框架DF1,它有一个由名称组成的列。另一个数据框架DF2由列[description, released,…]更多)

我想创建数据框架DF3,它是DF1和DF2的连接,条件是DF1的名称应该在DF2的描述中。

的例子:

DF1
name
0  John
1  Mike
2  Kara
DF2
released total  description
0  2006     5      This involved John and Kara who played role of......
1  2010     120    It is the latest release of Mike and Kara after almost decade...

DF3 [Expected output DF]
name    released  total description
0      John    2006      5     This involved John and Kara who played role of......
1      Kara    2006      5     This involved John and Kara who played role of......
2      Kara    2010      120   It is the latest release of Mike and Kara after almost decade...
3      Mike    2010      120  It is the latest release of Mike and Kara after almost decade...

我正在尝试做交叉连接,使所有的组合,然后过滤出基于列名和描述的条件。

val DF3 = DF1.crossjoin(DF2).filter(col("name") in col("description"))

似乎在Snowpark中没有可用的contains方法来做这件事。

有人知道怎么做吗?

至少有两种解决方案,但你应该问自己一些问题:

  1. 你是想找到子字符串,还是想找到一个单词?例如,你想找到Karan的名字Kara吗?
  2. 你想在这种状态下存储或使用结果数据帧吗?也许你想以更优化的方式存储/使用它,例如,为每个名称存储DF2的索引/行位置。

你可以测试(在大而真实的数据集上)哪一个更快更适合你。

第一个解决方案(通过DataFrame)

import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Main extends App {
case class Name(name: String)
case class TextInfo(year: Int, moth: Int, text: String)
val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
val sc: SparkContext = spark.sparkContext
val namesDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq("John", "Mike", "Kara").map(Name)))
val textToSearchDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq(
TextInfo(2006, 5, "This involved John and Kara who played role of"),
TextInfo(2010, 120, "It is the latest release of Mike and Kara after almost decade")
)))
val resultDf: DataFrame = textToSearchDf.crossJoin(namesDf)
.where(new Column($"text" contains $"name"))
resultDf.foreach(println(_))
}

第二个解决方案,通过RDD:

val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
val sc: SparkContext = spark.sparkContext
val namesAsRdd: RDD[String] = sc.parallelize(Seq("John", "Mike", "Kara"))
val rddWithTextToSearch: RDD[(Int, Int, String)] = sc.parallelize(Seq(
(2006, 5, "This involved John and Kara who played role of"),
(2010, 120, "It is the latest release of Mike and Kara after almost decade")
))
val names: Set[String] = namesAsRdd.collect().toSet
val resultRdd: RDD[(String, Int, Int, String)] = rddWithTextToSearch.flatMap {
case (year, month, str) => names.filter(name => str.contains(name)).map(name => (name, year, month, str))
}
resultRdd.foreach(println(_))

相关内容

  • 没有找到相关文章

最新更新