尝试使用spark 2.1 计算单词同现
我的输入数据看起来像:
+---+-------------------+
| id| keywords|
+---+-------------------+
| 8| [mouse, cat]|
| 9| [bat, cat]|
| 10|[mouse, house, cat]|
+---+-------------------+
我想要的结果是这些行中的关键字组合,如所示
+-------+--------+
| word1 | word2 |
+-------+--------+
| cat | mouse |
| bat | cat |
| cat | house |
| cat | mouse |
| house | mouse |
+-------+--------+
由于输入行很少有超过20个左右的关键字,Scala的combinations((似乎足以构建出现对。
给定一个fn和UDF,将其包装为:
def combine(items: Seq[String]) = {
items.sorted.combinations(2).toList
}
val combineUDF = udf(combine _)
使用一个简单的序列,我可以获得出现的关键字对,如下所示:
val simpleSeq = Seq("cat", "mouse", "house")
println(combine(simpleSeq)
List(List(cat, house), List(cat, mouse), List(house, mouse))
使用数据帧
使用输入数据的数据帧,如中所示
val comboDF = sourceDF.withColumn("combinations", combineUDF($"keywords"))
comboDF.printSchema
comboDF.show
comboDF: org.apache.spark.sql.DataFrame = [id: int, keywords: array<string> ... 1 more field]
root
|-- id: integer (nullable = false)
|-- keywords: array (nullable = true)
| |-- element: string (containsNull = true)
|-- combinations: array (nullable = true)
| |-- element: array (containsNull = true)
| | |-- element: string (containsNull = true)
+---+-------------------+--------------------+
| id| keywords| combinations|
+---+-------------------+--------------------+
| 9| [mouse, cat]|[WrappedArray(cat...|
| 8| [bat, cat]|[WrappedArray(bat...|
| 10|[mouse, house, cat]|[WrappedArray(cat...|
+---+-------------------+--------------------+
接下来,我想提取组合列中的每个对,每个对作为一行。
我不知道该怎么做。
添加的列是[WrappedArray[WrappedArray[String]]]
类型,我似乎无法映射到它:
import scala.collection.mutable.WrappedArray
comboDF.map(row => row.get(2).asInstanceOf[WrappedArray[Seq[String]]].array).show
<console>:55: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
comboDF.map(row => row.get(2).asInstanceOf[WrappedArray[Seq[String]]].array).show
使用RDD
为了解决基于DF的明显无法处理嵌套包装数组的问题,我尝试了RDD(我不太熟悉(。
我可以通过获得每行关键字的组合
val wrappedPairs = listDF.select("keywords").
rdd.collect.map(r =>
combine(r.get(0).asInstanceOf[WrappedArray[String]].array.toList)
Array[List[Seq[String]]] = Array(List(List(cat, mouse)), List(List(bat, cat)), List(List(cat, house), List(cat, mouse), List(house, mouse)))
这基本上给了我:
Array(
List(
List(cat, mouse)
),
List(
List(bat, cat)
),
List(
List(cat, house),
List(cat, mouse),
List(house, mouse)
)
)
我想去:
+-------+--------+
| word1 | word2 |
+-------+--------+
| cat | mouse |
| bat | cat |
| cat | house |
| cat | mouse |
| house | mouse |
+-------+--------+
我可以使用println获得这些对,但似乎不知道如何将它们作为行取出:
wrappedPairs.map(outerList => outerList.asInstanceOf[List[List[String]]].
map(innerList => innerList.asInstanceOf[List[String]]).
map(pair => (pair(0),pair(1).toSeq)).foreach(println)
)
wrappedPairs: Array[List[Seq[String]]] = Array(List(List(cat, mouse)), List(List(bat, cat)), List(List(cat, house), List(cat, mouse), List(house, mouse)))
(cat,mouse)
(bat,cat)
(cat,house)
(cat,mouse)
(house,mouse)
你非常接近!您需要的两件事是explode
和如何从结构类型中投影字段。explode
将把一行中的项目列表扩展到单独的行(复制所有其他字段(。分解后,您需要在"组合"中选择数组中的值。有几种方法可以做到这一点,但我通常按照下面的方式来做。
查看此示例代码:
val input = Seq(
(Seq("mouse", "cat")),
(Seq("bat", "cat")),
(Seq("mouse", "house", "cat"))
).toDF("keywords")
def combine(items: Seq[String]) = {
items.sorted.combinations(2).toList
}
val combineUDF = udf(combine _)
val df = input.withColumn("combinations", explode(combineUDF($"keywords")))
df.show
这会给你一个这样的DataFrame:
+-------------------+--------------+
| keywords| combinations|
+-------------------+--------------+
| [mouse, cat]| [cat, mouse]|
| [bat, cat]| [bat, cat]|
|[mouse, house, cat]| [cat, house]|
|[mouse, house, cat]| [cat, mouse]|
|[mouse, house, cat]|[house, mouse]|
+-------------------+--------------+
现在,您只需从每行的数组中选择两个字段,如下所示:
val df2 = df.selectExpr("combinations[0] as word1", "combinations[1] as word2")
df2.show
输出:
+-----+-----+
|word1|word2|
+-----+-----+
| cat|mouse|
| bat| cat|
| cat|house|
| cat|mouse|
|house|mouse|
+-----+-----+