有没有办法在 Spark(使用 Scala)中按自定义分隔符拆分而不是逐行读取,以读取一组键、值对



我有一个格式的输入.txt文件。

Record  
ID||1  
Word||ABC   
Language||English   
Count||2   
Record  
ID||2  
Word||DEF  
Language||French  
Count||4 

等等。

我是Apache Spark/Scala的新手。

我看到有一些选项可以使用.textFile方法逐行读取文件,或者使用.wholeTextFile方法读取整个文件。我们还可以读取CSV格式的文件。

但是,假设我想读取这样一个文件并从中创建一个案例类,其中将包含成员 ID、单词、语言、计数,我该怎么做?

假设输入格式一致(没有随机空格,始终以"Record"结尾),则以下代码有效。

关键在于Hadoop配置"textinputformat.record.delimiter"

case class Foo(ID : Long, Word : String, Language : String, Count : Long)

.

val conf = new SparkConf()
conf.setMaster("local[*]")
conf.setAppName("stackOverflow")
val sc = new SparkContext(conf)
sc.hadoopConfiguration.set("textinputformat.record.delimiter","Recordn")
val rdd = sc.textFile("C:\TEMP\stack.txt")
  .flatMap(record => {
    if (record.isEmpty) None //needed to remove first empty string delimited by "Recordn"
    else {
      val lines = record.split("n").map(_.split("\|\|"))
      //lines.foreach(x=>println(x.mkString(",")))
      Some(Foo(
        lines(0)(1).toLong,
        lines(1)(1),
        lines(2)(1),
        lines(3)(1).toLong
      ))
    }
  })
rdd.foreach(println)

输出为

Foo(2,DEF,French,4)
Foo(1,ABC,English,2)

最新更新