在具有 75GB 内存的 EMR 集群上"Container killed by YARN for exceeding memory limits. 10.4 GB of 10.4 GB physic



我在AWS EMR上运行一个5个节点spark群集,每个尺寸m3.xlarge(1主4名从)。我成功运行了146MB的BZIP2压缩CSV文件,最终得到了完美的结果。

现在,我正在尝试在此集群上处理〜5GB BZIP2 CSV文件,但我正在收到此错误:

16/11/23 17:29:53警告taskSetManager:丢失任务49.2阶段6.0(tid xxx,xxx.xxx.xxx.xxx.compute.internal):executorlostfailure(executor 16由运行任务之一引起的executor 16)原因:因超过记忆限制而被纱杀死的容器。10.4 GB的10.4 GB物理内存。考虑增强spark.yarn.executor.memoryoverhead。

我很困惑为什么我在〜75GB群集上获得〜10.5GB的内存限制(每3m.xlarge实例15GB)...

这是我的EMR配置:

[
 {
  "classification":"spark-env",
  "properties":{
  },
  "configurations":[
     {
        "classification":"export",
        "properties":{
           "PYSPARK_PYTHON":"python34"
        },
        "configurations":[
        ]
     }
  ]
},
{
  "classification":"spark",
  "properties":{
     "maximizeResourceAllocation":"true"
  },
  "configurations":[
  ]
 }
]

根据我阅读的内容,设置maximizeResourceAllocation属性应告诉EMR配置Spark,以充分利用群集上可用的所有资源。即,我应该有〜75GB的内存可用...那么为什么要遇到〜10.5GB内存限制错误?这是我正在运行的代码:

def sessionize(raw_data, timeout):
# https://www.dataiku.com/learn/guide/code/reshaping_data/sessionization.html
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp"))
    diff = (pyspark.sql.functions.lag(raw_data.timestamp, 1)
            .over(window))
    time_diff = (raw_data.withColumn("time_diff", raw_data.timestamp - diff)
                 .withColumn("new_session", pyspark.sql.functions.when(pyspark.sql.functions.col("time_diff") >= timeout.seconds, 1).otherwise(0)))
    window = (pyspark.sql.Window.partitionBy("user_id", "site_id")
              .orderBy("timestamp")
              .rowsBetween(-1, 0))
    sessions = (time_diff.withColumn("session_id", pyspark.sql.functions.concat_ws("_", "user_id", "site_id", pyspark.sql.functions.sum("new_session").over(window))))
    return sessions
def aggregate_sessions(sessions):
    median = pyspark.sql.functions.udf(lambda x: statistics.median(x))
    aggregated = sessions.groupBy(pyspark.sql.functions.col("session_id")).agg(
        pyspark.sql.functions.first("site_id").alias("site_id"),
        pyspark.sql.functions.first("user_id").alias("user_id"),
        pyspark.sql.functions.count("id").alias("hits"),
        pyspark.sql.functions.min("timestamp").alias("start"),
        pyspark.sql.functions.max("timestamp").alias("finish"),
        median(pyspark.sql.functions.collect_list("foo")).alias("foo"),
    )
    return aggregated
 spark_context = pyspark.SparkContext(appName="process-raw-data")
spark_session = pyspark.sql.SparkSession(spark_context)
raw_data = spark_session.read.csv(sys.argv[1],
                                  header=True,
                                  inferSchema=True)
# Windowing doesn't seem to play nicely with TimestampTypes.
#
# Should be able to do this within the ``spark.read.csv`` call, I'd
# think. Need to look into it.
convert_to_unix = pyspark.sql.functions.udf(lambda s: arrow.get(s).timestamp)
raw_data = raw_data.withColumn("timestamp",
                               convert_to_unix(pyspark.sql.functions.col("timestamp")))
sessions = sessionize(raw_data, SESSION_TIMEOUT)
aggregated = aggregate_sessions(sessions)
aggregated.foreach(save_session)

基本上,只不过是窗口和一个汇总数据的组。

它从其中一些错误开始,朝着停止相同的错误量增加。

我尝试使用>运行Spark-Submit -conf spark.yarn.executor.memoryoverhead ,但这似乎也无法解决问题。

我感到你的痛苦..

我们有类似的问题,即在纱线上使用Spark用完了记忆。我们有五个64GB,16个核心VM,无论将spark.yarn.executor.memoryOverhead设置为什么,我们都无法获得足够的内存来完成这些任务 - 无论我们给他们多少内存,它们最终都会死亡。这是一种相对直接的火花应用程序,导致这种情况发生。

我们弄清楚VM的物理内存使用情况很低,但是虚拟内存使用率非常高(尽管日志抱怨物理内存)。我们将yarn-site.xml中的yarn.nodemanager.vmem-check-enabled设置为false,我们的容器不再被杀死,并且该应用程序似乎可以按预期工作。

做更多的研究,我找到了为什么在这里发生这种情况的答案:http://web.archive.org/web/20190806000138/https:/https://mapr.com/blog/blog/best-practices-practices-yarn-resource-resource-resource-manatage/

由于在CentOS/RHEL 6上,由于OS行为而对虚拟内存进行了积极的分配,因此您应该禁用虚拟内存检查器或增加YARN.NODEMANAGER.VMEM-PMEM-PMEM-RATIO到相对较大的值。

该页面具有来自IBM的非常有用页面的链接linux_glibc_2_10_rhel_6_malloc_may_show_show_excessive_virtual_memory_usage?lang = en

总而言之,glibc>2.10改变了其内存分配。尽管分配了大量的虚拟内存并不是世界的尽头,但它与纱线的默认设置无效。

,而不是将yarn.nodemanager.vmem-check-enabled设置为False ,您也可以使用将MALLOC_ARENA_MAX环境变量设置为hadoop-env.sh中的较低数字。此错误报告具有有关此信息的有用信息:https://issues.apache.org/jira/browse/hadoop-7154

我建议阅读这两个页面 - 信息非常方便。

如果您不使用spark-submit,并且正在寻找指定Duff提到的yarn.nodemanager.vmem-check-enabled参数的另一种方法,这是另外2种方法:

方法2

如果您使用的是JSON配置文件(将其传递到AWS CLI或BOTO3脚本),则必须添加以下配置:

[{
"Classification": "yarn-site", 
  "Properties": {
    "yarn.nodemanager.vmem-check-enabled": "false"
   }
}]

方法3

如果使用EMR控制台,请添加以下配置:

classification=yarn-site,properties=[yarn.nodemanager.vmem-check-enabled=false]

见,

我在一个巨大的集群中也有同样的问题。该问题将无法解决为工人添加内存。有时,在过程中,Spark会使用比它更多的内存,并且Spark作业将开始使用外部内存。

一个简单的示例是:

如果您有一个数据集需要reduceByKey,则有时会比另一个工人中的数据更多的数据,并且如果此数据驱除了一个工人的内存,则会收到该错误消息。

添加选项spark.yarn.executor.memoryOverhead如果您设置了用于工作人员使用的50%的内存(仅用于测试),则可以使用更多的测试来添加少量)。

,但是您需要了解Spark如何与群集中的内存分配一起工作:

  1. 更常见的方式Spark使用了75%的机器内存。其余的是这样。
  2. 在执行过程中,Spark具有两种类型的内存。一个部分是执行,另一部分是存储。执行用于洗牌,连接,聚合等。存储用于缓存和传播群集的数据。

关于内存分配的一件好事,如果您在执行中不使用缓存,则可以将火花设置为使用Sotorage空间以执行执行,以避免部分OOM错误。您可以在Spark的文档中看到这一点:

此设计可确保几种理想的属性。首先,不使用缓存的应用程序可以使用整个空间进行执行,从而避免了不必要的磁盘溢出。其次,使用缓存的应用程序可以保留其数据块不受驱逐的数据块的最低存储空间(R)。最后,这种方法为各种工作负载提供了合理的定期性能,而不需要用户专业知识在内部分配内存。

但是我们如何使用?

您可以更改一些配置,将MemoryOverhead配置添加到您的工作调用中,但请考虑添加此配置:spark.memory.fraction更改为0.8或0.85,并将spark.memory.storageFraction更改为0.35或0.2。

。 。

其他配置可以帮助您,但需要检查您的情况。在这里所有这些配置。

现在,在我的情况下有所帮助。

我有一个带有2.5k工人和2.5TB的RAM的集群。我们像您一样面临OOM错误。我们只是将spark.yarn.executor.memoryOverhead增加到2048年。我们启用了动态分配。当我们打电话给工作时,我们不会为工人设置记忆,我们将其留给火花决定。我们只是设置开销。

但是,对于我的小集群的一些测试,改变了执行和存储内存的大小。解决了问题。

尝试重新分配。在我的情况下,它有效。

数据框在加载write.csv()的一开始就不那么大。数据文件总计为10 MB左右,因此,对于执行程序中的每个处理任务,可能需要说几次100 MB内存。我检查了当时为2的分区数。然后,它在以下操作中与其他桌子连接在一起,添加新列时,它像滚雪球一样长。然后,我在某个步骤中遇到了超过限制问题的内存。我检查了分区的数量,它仍然是2,我猜是从原始数据框架中得出的。因此,我试图从一开始就重新分配它,而且没有问题。

我还没有阅读有关Spark和Yarn的许多材料。我所知道的是节点中有执行者。执行人可以根据资源来处理许多任务。我的猜测是,一个分区将在原子上映射到一个任务。它的音量决定了资源的使用。如果一个分区变得太大,火花将无法切片。

合理的策略是首先确定节点和容器存储器,即10GB或5GB。理想情况下,两者都可以服务于任何数据处理工作,只是时间问题。给定5GB内存设置,您发现的一个分区的合理行是测试后的1000(在处理过程中不会失败),我们可以作为以下伪代码这样做:

RWS_PER_PARTITION = 1000
input_df = spark.write.csv("file_uri", *other_args)
total_rows = input_df.count()
original_num_partitions = input_df.getNumPartitions()
numPartitions = max(total_rows/RWS_PER_PARTITION, original_num_partitions)
input_df = input_df.repartition(numPartitions)

希望它有帮助!

i在Spark 2.3.1上运行相对较小的小群集上遇到了相同的问题。该作业读取镶木quet文件,使用groupby/agg/首先使用groupby/agg删除重复项,然后对新的镶木制进行编写。它在4个节点(4个VCORS,32GB RAM)上处理了51 GB的Parquet文件。

在聚合阶段,工作一直在失败。我写了bash脚本观看执行者的内存使用情况,发现在阶段中间,一个随机执行者开始将双重内存持续几秒钟。当我将这一刻的时间与GC日志相关联时,它与清空大量内存的完整GC匹配。

终于知道问题与GC有关。平行和G1不断引起此问题,但concmarkSweepGC改善了情况。该问题仅带有少量分区。我在安装OpenJDK 64-Bit (build 25.171-b10)的EMR上运行了工作。我不知道问题的根本原因,它可能与JVM或操作系统有关。但这绝对与我的堆或外部使用无关。

update1

尝试的Oracle热点,该问题被复制。

最新更新