我正在用Scala编写函数来获取用于训练ML模型的数据。我有一个数据框架DF1,它有一个由名称组成的列。另一个数据框架DF2由列[description, released,…]更多)
我想创建数据框架DF3,它是DF1和DF2的连接,条件是DF1的名称应该在DF2的描述中。
的例子:
DF1
name
0 John
1 Mike
2 Kara
DF2
released total description
0 2006 5 This involved John and Kara who played role of......
1 2010 120 It is the latest release of Mike and Kara after almost decade...
DF3 [Expected output DF]
name released total description
0 John 2006 5 This involved John and Kara who played role of......
1 Kara 2006 5 This involved John and Kara who played role of......
2 Kara 2010 120 It is the latest release of Mike and Kara after almost decade...
3 Mike 2010 120 It is the latest release of Mike and Kara after almost decade...
我正在尝试做交叉连接,使所有的组合,然后过滤出基于列名和描述的条件。
val DF3 = DF1.crossjoin(DF2).filter(col("name") in col("description"))
似乎在Snowpark中没有可用的contains方法来做这件事。
有人知道怎么做吗?
至少有两种解决方案,但你应该问自己一些问题:
- 你是想找到子字符串,还是想找到一个单词?例如,你想找到Karan的名字Kara吗?
- 你想在这种状态下存储或使用结果数据帧吗?也许你想以更优化的方式存储/使用它,例如,为每个名称存储DF2的索引/行位置。
你可以测试(在大而真实的数据集上)哪一个更快更适合你。
第一个解决方案(通过DataFrame)
import org.apache.spark.SparkContext
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
object Main extends App {
case class Name(name: String)
case class TextInfo(year: Int, moth: Int, text: String)
val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
val sc: SparkContext = spark.sparkContext
val namesDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq("John", "Mike", "Kara").map(Name)))
val textToSearchDf: DataFrame = spark.createDataFrame(sc.parallelize(Seq(
TextInfo(2006, 5, "This involved John and Kara who played role of"),
TextInfo(2010, 120, "It is the latest release of Mike and Kara after almost decade")
)))
val resultDf: DataFrame = textToSearchDf.crossJoin(namesDf)
.where(new Column($"text" contains $"name"))
resultDf.foreach(println(_))
}
第二个解决方案,通过RDD:
val spark: SparkSession = SparkSession.builder.config("spark.master", "local").getOrCreate()
val sc: SparkContext = spark.sparkContext
val namesAsRdd: RDD[String] = sc.parallelize(Seq("John", "Mike", "Kara"))
val rddWithTextToSearch: RDD[(Int, Int, String)] = sc.parallelize(Seq(
(2006, 5, "This involved John and Kara who played role of"),
(2010, 120, "It is the latest release of Mike and Kara after almost decade")
))
val names: Set[String] = namesAsRdd.collect().toSet
val resultRdd: RDD[(String, Int, Int, String)] = rddWithTextToSearch.flatMap {
case (year, month, str) => names.filter(name => str.contains(name)).map(name => (name, year, month, str))
}
resultRdd.foreach(println(_))