嵌套WrappedArray上的Spark映射



尝试使用spark 2.1 计算单词同现

我的输入数据看起来像:

+---+-------------------+
| id|           keywords|
+---+-------------------+
|  8|       [mouse, cat]|
|  9|         [bat, cat]|
| 10|[mouse, house, cat]|
+---+-------------------+

我想要的结果是这些行中的关键字组合,如所示

+-------+--------+
| word1 |  word2 |
+-------+--------+
| cat   |  mouse |
| bat   |  cat   |
| cat   |  house |
| cat   |  mouse |
| house |  mouse |
+-------+--------+

由于输入行很少有超过20个左右的关键字,Scala的combinations((似乎足以构建出现对。

给定一个fn和UDF,将其包装为:

def combine(items: Seq[String]) = {
items.sorted.combinations(2).toList
}
val combineUDF = udf(combine _)

使用一个简单的序列,我可以获得出现的关键字对,如下所示:

val simpleSeq = Seq("cat", "mouse", "house")
println(combine(simpleSeq)
List(List(cat, house), List(cat, mouse), List(house, mouse))

使用数据帧
使用输入数据的数据帧,如中所示

val comboDF = sourceDF.withColumn("combinations", combineUDF($"keywords"))
comboDF.printSchema
comboDF.show
comboDF: org.apache.spark.sql.DataFrame = [id: int, keywords: array<string> ... 1 more field]
root
|-- id: integer (nullable = false)
|-- keywords: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- combinations: array (nullable = true)
|    |-- element: array (containsNull = true)
|    |    |-- element: string (containsNull = true)
+---+-------------------+--------------------+
| id|           keywords|        combinations|
+---+-------------------+--------------------+
|  9|       [mouse, cat]|[WrappedArray(cat...|
|  8|         [bat, cat]|[WrappedArray(bat...|
| 10|[mouse, house, cat]|[WrappedArray(cat...|
+---+-------------------+--------------------+

接下来,我想提取组合列中的每个对,每个对作为一行

我不知道该怎么做。

添加的列是[WrappedArray[WrappedArray[String]]]类型,我似乎无法映射到它:

import scala.collection.mutable.WrappedArray
comboDF.map(row => row.get(2).asInstanceOf[WrappedArray[Seq[String]]].array).show
<console>:55: error: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
comboDF.map(row => row.get(2).asInstanceOf[WrappedArray[Seq[String]]].array).show

使用RDD

为了解决基于DF的明显无法处理嵌套包装数组的问题,我尝试了RDD(我不太熟悉(。

我可以通过获得每行关键字的组合

val wrappedPairs = listDF.select("keywords").
rdd.collect.map(r => 
combine(r.get(0).asInstanceOf[WrappedArray[String]].array.toList)
Array[List[Seq[String]]] = Array(List(List(cat, mouse)), List(List(bat, cat)), List(List(cat, house), List(cat, mouse), List(house, mouse)))

这基本上给了我:

Array(
List(
List(cat, mouse)
), 
List(
List(bat, cat)
), 
List(
List(cat, house), 
List(cat, mouse), 
List(house, mouse)
)
)

我想去:

+-------+--------+
| word1 |  word2 |
+-------+--------+
| cat   |  mouse |
| bat   |  cat   |
| cat   |  house |
| cat   |  mouse |
| house |  mouse |
+-------+--------+

我可以使用println获得这些对,但似乎不知道如何将它们作为行取出:

wrappedPairs.map(outerList => outerList.asInstanceOf[List[List[String]]].
map(innerList => innerList.asInstanceOf[List[String]]).
map(pair => (pair(0),pair(1).toSeq)).foreach(println)
)

wrappedPairs: Array[List[Seq[String]]] = Array(List(List(cat, mouse)), List(List(bat, cat)), List(List(cat, house), List(cat, mouse), List(house, mouse)))
(cat,mouse)
(bat,cat)
(cat,house)
(cat,mouse)
(house,mouse)

你非常接近!您需要的两件事是explode和如何从结构类型中投影字段。explode将把一行中的项目列表扩展到单独的行(复制所有其他字段(。分解后,您需要在"组合"中选择数组中的值。有几种方法可以做到这一点,但我通常按照下面的方式来做。

查看此示例代码:

val input = Seq(
(Seq("mouse", "cat")),
(Seq("bat", "cat")),
(Seq("mouse", "house", "cat"))
).toDF("keywords")
def combine(items: Seq[String]) = {
items.sorted.combinations(2).toList
}
val combineUDF = udf(combine _)
val df = input.withColumn("combinations", explode(combineUDF($"keywords")))
df.show

这会给你一个这样的DataFrame:

+-------------------+--------------+
|           keywords|  combinations|
+-------------------+--------------+
|       [mouse, cat]|  [cat, mouse]|
|         [bat, cat]|    [bat, cat]|
|[mouse, house, cat]|  [cat, house]|
|[mouse, house, cat]|  [cat, mouse]|
|[mouse, house, cat]|[house, mouse]|
+-------------------+--------------+

现在,您只需从每行的数组中选择两个字段,如下所示:

val df2 = df.selectExpr("combinations[0] as word1", "combinations[1] as word2")
df2.show

输出:

+-----+-----+
|word1|word2|
+-----+-----+
|  cat|mouse|
|  bat|  cat|
|  cat|house|
|  cat|mouse|
|house|mouse|
+-----+-----+

最新更新