我正在处理一组相当大的txt文件,每个文件有几个100MB。我想做的是复制这些,在那里我用函数MapFunc
映射每一行。下面是我的第一次尝试,速度非常慢。我确信问题出在reduce函数上,它连接了这个巨大的字符串。
行写入outputFile
的顺序并不重要,但它们不能重叠。我已经看了Spark的saveAsTextFile
,但据我所知,我不能指定文件名,只能指定目录,这对我的用例没有用处。此外,在RDD的元素之间添加页眉和页脚以及逗号怎么样?如果能为我提供如何将此应用程序调整到最佳性能的建议,我将不胜感激。
val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val input = sc.textFile(file)
val lines = input.filter(s => filterFunc(s)).map(s => MapFunc(s))
val output = lines.reduce((a, b) => a + ',' + b)
val outputFile = new File(outFile)
val writer = new BufferedWriter(new FileWriter(outputFile))
val buf = new StringBuilder
buf ++= "header"
buf ++= output
buf ++= "footer"
writer.append(buf)
writer.flush()
writer.close()
编辑:我的文件是简单的csv文件。他们可以有评论(#)。此外,我需要确保只处理具有3列的文件,因为用户可以提交自己的文件进行处理。这是由FilterFunc
完成的,老实说,它不排除整个文件,只排除不符合条件的行。一个简单的例子如下:
# File A
# generated mm/dd/yyyy
field11,field12,field13
field21,field22,field23
field31,field32,field33
输出填充看起来像这样:
$header
map(line1),
map(line2),
map(line3)
$footer
saveAsTextFile
真的很接近我想要的。但正如前面所说的,我可以控制输出文件的文件名和位置,这对我来说很重要。
与其使用临时缓冲区buf
,不如考虑直接写入文件。
val writer = new PrintWriter(new File(outFile))
writer.print("header")
writer.print(output)
writer.print("footer")
writer.flush()
writer.close()
您可以避免串联,也可以避免占用buf
的内存。