Apache Spark:在reduce函数中构建文件/字符串



我正在处理一组相当大的txt文件,每个文件有几个100MB。我想做的是复制这些,在那里我用函数MapFunc映射每一行。下面是我的第一次尝试,速度非常慢。我确信问题出在reduce函数上,它连接了这个巨大的字符串。

行写入outputFile的顺序并不重要,但它们不能重叠。我已经看了Spark的saveAsTextFile,但据我所知,我不能指定文件名,只能指定目录,这对我的用例没有用处。此外,在RDD的元素之间添加页眉和页脚以及逗号怎么样?如果能为我提供如何将此应用程序调整到最佳性能的建议,我将不胜感激。

val conf = new SparkConf().setAppName("MyApp")
val sc = new SparkContext(conf)
val input = sc.textFile(file)
val lines = input.filter(s => filterFunc(s)).map(s => MapFunc(s))
val output = lines.reduce((a, b) => a + ',' + b)
val outputFile = new File(outFile)
val writer = new BufferedWriter(new FileWriter(outputFile))
val buf = new StringBuilder
buf ++= "header"
buf ++= output
buf ++= "footer"
writer.append(buf)
writer.flush()
writer.close()

编辑:我的文件是简单的csv文件。他们可以有评论(#)。此外,我需要确保只处理具有3列的文件,因为用户可以提交自己的文件进行处理。这是由FilterFunc完成的,老实说,它不排除整个文件,只排除不符合条件的行。一个简单的例子如下:

# File A
# generated mm/dd/yyyy
field11,field12,field13
field21,field22,field23
field31,field32,field33

输出填充看起来像这样:

$header
map(line1),
map(line2),
map(line3)
$footer

saveAsTextFile真的很接近我想要的。但正如前面所说的,我可以控制输出文件的文件名和位置,这对我来说很重要。

与其使用临时缓冲区buf,不如考虑直接写入文件。

val writer = new PrintWriter(new File(outFile))
writer.print("header")
writer.print(output)
writer.print("footer")
writer.flush()
writer.close()

您可以避免串联,也可以避免占用buf的内存。

相关内容

  • 没有找到相关文章

最新更新