如何确定Spark中shuffle分区的最佳数量



我正在EMR中运行一个spark结构的流式作业(每天都会反弹(。在执行了几个小时后,我的应用程序中出现OOM错误,并被杀死。以下是我的配置和spark SQL代码。我是Spark的新手,需要您的宝贵意见。

EMR有10个实例,具有16核和64GB内存。

Spark Submit参数:

num_of_executors: 17
executor_cores: 5
executor_memory: 19G
driver_memory: 30G

作业是以30秒的间隔从Kafka中以微批的形式读取输入。每个批次读取的平均行数为90k。

spark.streaming.kafka.maxRatePerPartition: 4500
spark.streaming.stopGracefullyOnShutdown: true
spark.streaming.unpersist: true
spark.streaming.kafka.consumer.cache.enabled: true
spark.hadoop.fs.s3.maxRetries: 30 
spark.sql.shuffle.partitions: 2001

Spark SQL聚合代码:

dataset.groupBy(functions.col(NAME),functions.window(functions.column(TIMESTAMP_COLUMN),30))
.agg(functions.concat_ws(SPLIT, functions.collect_list(DEPARTMENT)).as(DEPS))
.select(NAME,DEPS)
.map((row) -> {
Map<String, Object> map = Maps.newHashMap();
map.put(NAME, row.getString(0));
map.put(DEPS, row.getString(1));
return new KryoMapSerializationService().serialize(map);
}, Encoders.BINARY());

来自驱动程序的一些日志:

20/04/04 13:10:51 INFO TaskSetManager: Finished task 1911.0 in stage 1041.0 (TID 1052055) in 374 ms on <host> (executor 3) (1998/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1925.0 in stage 1041.0 (TID 1052056) in 411 ms on  <host> (executor 3) (1999/2001)
20/04/04 13:10:52 INFO TaskSetManager: Finished task 1906.0 in stage 1041.0 (TID 1052054) in 776 ms on  <host> (executor 3) (2000/2001)
20/04/04 13:11:04 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 3.
20/04/04 13:11:04 INFO DAGScheduler: Executor lost: 3 (epoch 522)
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Trying to remove executor 3 from BlockManagerMaster.
20/04/04 13:11:04 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(3,  <host>, 38533, None)
20/04/04 13:11:04 INFO BlockManagerMaster: Removed 3 successfully in removeExecutor
20/04/04 13:11:04 INFO YarnAllocator: Completed container container_1582797414408_1814_01_000004 on host:  <host> (state: COMPLETE, exit status: 143)

顺便说一句,我在我的forEachBatch代码中使用collectasList

List<Event> list = dataset.select("value")
.selectExpr("deserialize(value) as rows")
.select("rows.*")
.selectExpr(NAME, DEPS)
.as(Encoders.bean(Event.class))
.collectAsList();

使用这些设置,您可能会导致自己的问题。

num_of_executors: 17
executor_cores: 5
executor_memory: 19G
driver_memory: 30G

您基本上是在这里创建额外的容器,以便在它们之间来回切换。相反,从10个执行器、15个内核、60克内存开始。如果这是有效的,那么你可以玩这些游戏来尝试优化性能。我通常会尝试每一步将我的容器一分为二(但自从spark 2.0以来,我也不需要这样做(

让Spark SQL将默认值保持在200。你把它分解得越多,你就越能让Spark计算洗牌。如果有什么不同的话,我会尝试使用与执行器相同数量的并行性,所以在本例中只有10个。当2.0问世时,这就是调优配置单元查询的方式。让工作变得复杂到难以分解,这会给主人带来所有的负担。

使用数据集和编码通常也不如使用直接的DataFrame操作那样高效。我发现将其分解为数据帧操作在性能上有很大的提升。

相关内容

  • 没有找到相关文章

最新更新