我有一个Spark job,它对ORC数据进行一些处理,并使用Spark 1.4.0中引入的DataFrameWriter save() API存储ORC数据。我有下面这段代码,它使用了大量shuffle内存。我如何优化下面的代码?有什么问题吗?它像预期的那样工作得很好,只是由于GC暂停和打乱大量数据而导致速度变慢,从而遇到内存问题。我是Spark的新手。
JavaRDD<Row> updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1, false).map(new Function<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
List<Object> rowAsList;
Row row1 = null;
if (row != null) {
rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
row1 = RowFactory.create(rowAsList.toArray());
}
return row1;
}
}).union(modifiedRDD);
DataFrame updatedDataFrame = hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity", "date").save("baseTable");
编辑
根据建议,我尝试使用mapPartitionsWithIndex
()将上面的代码转换为以下代码,但我仍然看到数据变换,它比上面的代码更好,但仍然失败,因为它达到了GC限制,抛出OOM或进入GC暂停很长时间和超时,YARN将杀死执行器。
我使用spark.storage.memoryFraction为0.5和spark.shuffle.memoryFraction为0.4;我尝试使用默认设置,并更改了许多组合,但没有任何帮助。
JavaRDD<Row> indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new Function2<Integer, Iterator<Row>, Iterator<Row>>() {
@Override
public Iterator<Row> call(Integer ind, Iterator<Row> rowIterator) throws Exception {
List<Row> rowList = new ArrayList<>();
while (rowIterator.hasNext()) {
Row row = rowIterator.next();
List<Object> rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
Row updatedRow = RowFactory.create(rowAsList.toArray());
rowList.add(updatedRow);
}
return rowList.iterator();
}
}, true).coalesce(200,true);
将RDD或Dataframe合并到单个分区意味着您的所有处理都发生在一台机器上。由于各种原因,这不是一件好事:所有的数据都必须在网络中进行洗牌,没有更多的并行性,等等。相反,您应该查看其他操作符,如reduceByKey, mapPartitions,或者除了将数据合并到单个机器之外的其他操作符。
注意:看看你的代码,我看不出为什么你把它带到一台机器上,你可能只需要删除那部分