我在一个目录中有很多文件,每个文件都包含跨多行的文本。目前,我使用以下代码将所有这些文件读取到 Spark 数据集 (>2.0)
val ddf = spark.read.text("file:///input/*")
但是,这将创建一个数据集,其中每行都是一行,而不是一个文件。我想在数据集中的每一行都有每个文件(字符串)。
如何在不迭代每个文件并将其作为RDD
单独读取的情况下实现这一点?
在SparkContext
上使用wholeTextFiles()
val rdd: RDD[(String, String)] = spark.sparkContext
.wholeTextFiles("file/path/to/read/as/rdd")
SparkContext.wholeTextFiles 允许您读取包含以下内容的目录 多个小文本文件,并返回每个文件作为 (文件名, 内容)对。这与文本文件相反,文本文件将返回 每个文件中每行一条记录。
@mrsrinivas 答案的另一种方法是按input_file_name
分组。给定结构:
evan@vbox>~/junk/so> find .
.
./d2
./d2/t.txt
./d1
./d1/t.txt
evan@vbox>~/junk/so> cat */*.txt
d1_1
d1_2
d2_1
d2_2
我们可以根据输入文件收集列表,如下所示:
scala> val ddf = spark.read.textFile("file:///home/evan/junk/so/*").
| select($"value", input_file_name as "fName")
ddf: org.apache.spark.sql.DataFrame = [value: string, fName: string]
scala> ddf.show(false)
+-----+----------------------------------+
|value|fName |
+-----+----------------------------------+
|d2_1 |file:///home/evan/junk/so/d2/t.txt|
|d2_2 |file:///home/evan/junk/so/d2/t.txt|
|d1_1 |file:///home/evan/junk/so/d1/t.txt|
|d1_2 |file:///home/evan/junk/so/d1/t.txt|
+-----+----------------------------------+
scala> ddf.groupBy("fName").agg(collect_list($"value") as "value").
| drop("fName").show
+------------+
| value|
+------------+
|[d1_1, d1_2]|
|[d2_1, d2_2]|
+------------+