我的输入数据集大约是150G。我设置
--conf spark.cores.max=100
--conf spark.executor.instances=20
--conf spark.executor.memory=8G
--conf spark.executor.cores=5
--conf spark.driver.memory=4G
但是由于数据不是均匀分布在执行器上,所以我一直得到
Container killed by YARN for exceeding memory limits. 9.0 GB of 9 GB physical memory used
以下是我的问题:
1. Did I not set up enough memory in the first place? I think 20 * 8G > 150G, but it's hard to make perfect distribution, so some executors will suffer
2. I think about repartition the input dataFrame, so how can I determine how many partition to set? the higher the better, or?
3. The error says "9 GB physical memory used", but i only set 8G to executor memory, where does the extra 1G come from?
谢谢!
当使用yarn时,还有另一个设置,用于计算执行器的yarn容器请求的大小:
spark.yarn.executor.memoryOverhead
默认为0.1 *执行器内存设置。它定义了除了指定为执行器的内存之外,还需要多少额外的内存开销。试着先增加这个数字。
同样,纱线容器不会给你任意大小的内存。它将只返回分配的内存大小为其最小分配大小的倍数的容器,该最小分配大小由以下设置控制:
yarn.scheduler.minimum-allocation-mb
设置一个较小的数字将减少你"超过"你所要求的金额的风险。
我还通常将下面的键设置为比我想要的容器大小大的值,以确保spark请求控制我的执行器的大小,而不是yarn践踏它们。这是纱线能出的最大集装箱尺寸。
nodemanager.resource.memory-mb
9GB由您作为参数添加的8GB执行器内存组成,spark.yarn.executor.memoryOverhead
设置为.1
,因此容器的总内存为spark.yarn.executor.memoryOverhead + (spark.yarn.executor.memoryOverhead * spark.yarn.executor.memoryOverhead)
,即8GB + (.1 * 8GB) ≈ 9GB
。
您可以使用单个执行器运行整个进程,但这将花费很长时间。要理解这一点,您需要了解分区和任务的概念。分区的数量由您的输入和操作定义。例如,如果你从hdfs读取一个150gb的csv文件,而你的hdfs块大小是128mb,你最终会得到150 * 1024 / 128 = 1200
分区,它直接映射到Spark UI中的1200个任务。
每个单独的任务都将由执行器拾取。你不需要占用所有150gb的内存。例如,当您只有一个执行器时,您显然不会从Spark的并行功能中受益,但它会从第一个任务开始,处理数据,并将其保存回dfs,然后开始处理下一个任务。
检查内容:
- 输入分区有多大?输入文件是否可分割?如果单个执行器必须加载大量内存,那么它肯定会耗尽内存。
- 你在执行什么样的操作?例如,如果您使用非常低的基数进行连接,那么您最终会得到大量分区,因为所有具有特定值的行最终都在相同的分区中。
- 执行了非常昂贵或低效的操作?任意笛卡尔积等
希望这对你有帮助。引发快乐!