在Spark中将多个小文件合并为几个较大的文件



I通过Spark使用hive。我的spark代码中有一个插入到分区表的查询。输入数据为200 gb以上。当Spark写入分区表时,它会吐出非常小的文件(以kb为单位的文件)。所以现在输出分区表文件夹有5000+个小kb文件。我想把这些合并到几个大MB的文件中,可能是几个200mb的文件。我厌倦了使用配置单元合并设置,但它们似乎不起作用。

'val result7A = hiveContext.sql("set hive.exec.dynamic.partition=true")
 val result7B = hiveContext.sql("set hive.exec.dynamic.partition.mode=nonstrict")
val result7C = hiveContext.sql("SET hive.merge.size.per.task=256000000")
val result7D = hiveContext.sql("SET hive.merge.mapfiles=true")
val result7E = hiveContext.sql("SET hive.merge.mapredfiles=true")
val result7F = hiveContext.sql("SET hive.merge.sparkfiles = true")
val result7G = hiveContext.sql("set hive.aux.jars.path=c:\Applications\json-serde-1.1.9.3-SNAPSHOT-jar-with-dependencies.jar")
val result8 = hiveContext.sql("INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table")'

上面的配置单元设置在mapreduce配置单元执行中工作,并吐出指定大小的文件。有没有选择Spark或Scala?

我遇到了同样的问题。解决方案是添加带有分区列的DISTRIBUTE BY子句。这样可以确保一个分区的数据进入单个reducer。示例:

INSERT INTO TABLE partition_table PARTITION (date) select a,b,c from partition_json_table DISTRIBUTE BY date

您可能需要尝试使用DataFrame.coalence方法;它返回一个具有指定分区数的DataFrame(每个分区在插入时变成一个文件)。因此,使用插入的记录数量和每条记录的典型大小,如果您想要大约200MB的文件,您可以估计要合并到多少分区。

数据帧重分区(1)方法在这种情况下有效。

相关内容

  • 没有找到相关文章

最新更新