避免使用hive.optimize.sort.dynamic.partition选项的单个文件



我正在使用hive。

当我使用 INSERT 查询编写动态分区并打开 hive.optimize.sort.dynamic.partition option(SET hive.optimize.sort.dynamic.partition=true( 时,每个分区中总是只有一个文件。

但是如果我打开该选项(SET hive.optimize.sort.dynamic.partition=false(,我像这样离开了内存异常。

TaskAttempt 3 failed, info=[Error: Error while running task ( failure ) : attempt_1534502930145_6994_1_01_000008_3:java.lang.RuntimeException: java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:194)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:168)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:370)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:73)
at org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:61)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:61)
at org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:37)
at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Java heap space
at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90)
at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93)
at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229)
at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131)
at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178)
at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203)
at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68)
at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56)
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:184)
at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:376)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99)
at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:327)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:288)
at org.apache.hadoop.hive.ql.io.parquet.write.ParquetRecordWriterWrapper.<init>(ParquetRecordWriterWrapper.java:67)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getParquerRecordWriterWrapper(MapredParquetOutputFormat.java:128)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat.getHiveRecordWriter(MapredParquetOutputFormat.java:117)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:271)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketForFileIdx(FileSinkOperator.java:619)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createBucketFiles(FileSinkOperator.java:563)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.createNewPaths(FileSinkOperator.java:867)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.getDynOutPaths(FileSinkOperator.java:975)
at org.apache.hadoop.hive.ql.exec.FileSinkOperator.process(FileSinkOperator.java:715)
at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:897)
at org.apache.hadoop.hive.ql.exec.SelectOperator.process(SelectOperator.java:95)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource$GroupIterator.next(ReduceRecordSource.java:356)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordSource.pushRecord(ReduceRecordSource.java:287)
at org.apache.hadoop.hive.ql.exec.tez.ReduceRecordProcessor.run(ReduceRecordProcessor.java:317)
]], Vertex did not succeed due to OWN_TASK_FAILURE, failedTasks:1 killedTasks:299, Vertex vertex_1534502930145_6994_1_01 [Reducer 2] killed/failed due to:OWN_TASK_FAILURE]Vertex killed, vertexName=Map 1, vertexId=vertex_1534502930145_6994_1_00, diagnostics=[Vertex received Kill while in RUNNING state., Vertex did not succeed due to OTHER_VERTEX_FAILURE, failedTasks:0 killedTasks:27, Vertex vertex_1534502930145_6994_1_00 [Map 1] killed/failed due to:OTHER_VERTEX_FAILURE]DAG did not succeed due to VERTEX_FAILURE. failedVertices:1 killedVertices:1

我想这个异常是因为减速器同时写入多个分区。但我找不到如何控制它。我关注了这篇文章,但它对我没有帮助。

我的环境是:

  • AWS EMR 5.12.1
  • 使用 tez 作为执行引擎
  • Hive 版本为 2.3.2,Tez 版本为 0.8.2
  • HDFS块大小为128MB
  • 大约有 30 个动态分区可以使用 INSERT 查询写入

这是我的示例查询。

SET hive.exec.dynamic.partition.mode=nonstrict;
SET hive.optimize.sort.dynamic.partition=true;
SET hive.exec.reducers.bytes.per.reducer=1048576;
SET mapred.reduce.tasks=300;
FROM raw_data
INSERT OVERWRITE TABLE idw_data
PARTITION(event_timestamp_date)
SELECT
*
WHERE 
event_timestamp_date BETWEEN '2018-09-09' AND '2018-10-09' 
DISTRIBUTE BY event_timestamp_date
;

distribute by partition key有助于解决 OOM 问题,但此配置可能会导致每个化简器写入整个分区,具体取决于hive.exec.reducers.bytes.per.reducer配置,默认情况下可以设置非常高的值,例如 1Gb。distribute by partition key可能会导致额外的还原阶段,同样hive.optimize.sort.dynamic.partition

因此,要避免 OOM 并实现最大性能:

  1. 在插入查询的末尾添加distribute by partition key,这将导致相同的分区键由相同的化简器处理。或者,或者除了此设置之外,还可以使用hive.optimize.sort.dynamic.partition=true
  2. hive.exec.reducers.bytes.per.reducer设置为将 如果一个分区中的数据过多,则触发更多化简器。只需检查hive.exec.reducers.bytes.per.reducer的当前值是多少,并相应地减少或增加它以获得适当的减速器并行度。此设置将确定单个化简器将处理多少数据以及每个分区将创建多少个文件。

例:

set hive.exec.reducers.bytes.per.reducer=33554432;
insert overwrite table partition (load_date)
select * from src_table
distribute by load_date;

另请参阅有关控制映射器和化简器数量的答案:https://stackoverflow.com/a/42842117/2700344

终于我发现了问题所在。

首先,执行引擎是 tez。mapreduce.reduce.memory.mb选项没有帮助。您应该使用hive.tez.container.size选项。写入动态分区时,化简器打开多个记录写入器。化简器需要足够的内存来同时写入多个分区。

如果使用hive.optimize.sort.dynamic.partition选项,则会运行全局分区排序,但排序意味着存在化简器。在这种情况下,如果没有其他化简器任务,则每个分区由一个化简器处理。这就是为什么分区中只有一个文件的原因。DISTRIBUTED BY 制作更多的 reduce 任务,因此它可以在每个分区中创建更多文件,但存在相同的内存问题。

因此,容器内存大小非常重要!不要忘记使用hive.tez.container.size选项来更改 tez 容器内存大小!

相关内容

  • 没有找到相关文章

最新更新