如何拆分一行文本并将该行附加到 Scala/Spark 中的每个元素



给定一个像"苹果从树上掉下来"这样的字符串,我如何拆分它,以便每个单词都附加一行文本,以便我得到一个字符串的RDD,如下所示:

"The | The apple fell from a tree"
"apple | The apple fell from a tree"
"fell | The apple fell from a tree"
"from | The apple fell from a tree"
"a | The apple fell from a tree"
"tree | The apple fell from a tree"

这将使我能够跟踪这个词的来源。

这是我写的(相关部分)

var inputPath = /path/to/file.txt // Some txt file
var input = sc.textFile(inputPath) // RDD of lines of text
var words = input.flatMap(line => line.split(" ").foreach(word => word.concat(" | " + line)) 

这个代码示例不起作用,因为据我了解,您不能在平面地图中多次遍历?我相信我得到一个错误Found: Unit Required: TraversableOnce[?]说我是Spark,Scala和函数式编程的新手。 第一次写 scala,我不太担心性能,或者最少的代码量等。我只是想要一些工作,而不必重新设计我的实现。我以后总是可以重构的。

我知道textFile()给了我一个RDD,其中包含表示文本每一行的字符串。 flatMap 用 " " 来拆分这些行,由于它是一个 flatMap,我们得到一个数组,而不是一堆数组。 如果我错了,或者说得不正确,请纠正我。

我手头没有火花,所以刚才无法确认,但查看代码和错误消息,很可能只是foreach

因此,快速修复(可能)是将最后一行替换为

input.flatMap(line => line.split(" ").map(word => word.concat(" | " + line))

解释:

  • line.split给你一个Array[String],我相信这是一个实例Traversable[String]
  • foreach对每个项目应用一个函数,但返回一个Unit- 这意味着调用中没有返回值(实际上它是Unit类型的单例实例,但如果它有帮助,你可以把它看作是 Java 术语中的void)
  • map还会对每个项应用一个函数,但返回包含更新项的新Traversable(可能具有不同的具体类型)。
  • 最后,flatMap是一种本质上结合了mapflatten的方法 - 即它需要一个函数来获取一个项目并返回一个Traversable[OtherType],将该函数应用于每个项目,然后通过连接内部可遍历对象来"展平"生成的Traversable[Traversable[OtherType]]。所以你需要给它String => Traversable[String],但你通过了String => Unit

有关更多信息,请参阅 Scala Traversable 文档。

纯字符串列表上的类似代码:

scala> List(
"line1 word1 word2", 
"line2 word3 word4"
)
.flatMap(line => line.split(" ").map(word => s"$word | $line"))
res5: List[String] = List(
line1 | line1 word1 word2,
word1 | line1 word1 word2,
word2 | line1 word1 word2,
line2 | line2 word3 word4,
word3 | line2 word3 word4,
word4 | line2 word3 word4
)

顺便说一句,Scala 鼓励不变性,所以你可能想使用val而不是var,除非你真的想重新分配值 -val或多或少类似于final.在您的代码示例中,您肯定可以安全地将vars 替换为vals。

最新更新