如何在Spark中使用Scala生成键值格式



我正在VirtualBox上学习Spark。我用/bin/spark-shell打开spark并使用Scala。现在我对使用Scala的键值格式感到困惑。

我在home/feng/spark/data中有一个txt文件,看起来像:

panda 0
pink 3
pirate 3
panda 1
pink 4

我使用sc.textFile来获取这个txt文件。如果我做

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7")

然后我可以使用rdd.collect()在屏幕上显示rdd:

scala> rdd.collect()
res26: Array[String] = Array(panda 0, pink 3, pirate 3, panda 1, pink 4)

但是,如果我做

val rdd = sc.textFile("/home/feng/spark/data/rdd4.7.txt")

此处没有".txt"。然后当我使用rdd.collect()时,我得到了一个错误:

org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/home/feng/spark/A.txt
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
......

但我看到了其他的例子。所有这些文件的末尾都有".txt"。我的代码或系统有问题吗?

另一件事是当我尝试做的时候:

scala> val rddd = rdd.map(x => (x.split(" ")(0),x))
rddd: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[2] at map at <console>:29
scala> rddd.collect()
res0: Array[(String, String)] = Array((panda,panda 0), (pink,pink 3), (pirate,pirate 3), (panda,panda 1), (pink,pink 4))

我打算选择数据的第一列并将其用作键。但是rddd.collect()看起来不是这样的,因为单词出现了两次,这是不对的。我不能继续做其他操作,比如mapbykey、reducebykey或其他操作。我哪里做错了?

例如,我用数据集创建了一个String,之后我逐行拆分记录,并使用SparkContextparallelize方法创建RDD。请注意,在创建RDD之后,我使用其map方法来拆分存储在每个记录中的String,并将其转换为Row

import org.apache.spark.sql.Row
val text = "panda 0npink 3npirate 3npanda 1npink 4"
val rdd = sc.parallelize(text.split("n")).map(x => Row(x.split(" "):_*))
rdd.take(3)

take方法的输出为:

res4: Array[org.apache.spark.sql.Row] = Array([panda,0], [pink,3], [pirate,3])

关于你的第一个问题,文件不需要任何扩展名。因为,在这种情况下,文件被视为纯文本。

最新更新