检查RDD中是否存在值



我已经在Python中写了一个Spark Program,该程序正常工作。

但是,在内存消耗和amp方面效率低下我正在尝试优化它。我正在AWS EMR上运行它,而EMR因消耗过多的记忆而杀死了这项工作。

 Lost executor 11 on ip-*****: Container killed by YARN for exceeding memory limits. 11.4 GB of 10.4 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

我相信这个内存问题是由于我在多个情况下收集了我的rdd(即使用.collect())的事实,因为在以后的阶段,我需要测试列表中是否存在某些值。rdds与否。

所以,目前我的代码看起来像这样:

myrdd = data.map(lambda word: (word,1))     
       .reduceByKey(lambda a,b: a+b)   
       .filter(lambda (a, b): b >= 5) 
       .map(lambda (a,b) : a)          
       .collect()

和以后的代码

if word in myrdd:
    mylist.append(word)
myrdd2 = data2.map(lambda word: (word,1))     
       .reduceByKey(lambda a,b: a+b)   
       .filter(lambda (a, b): b >= 5) 
       .map(lambda (a,b) : a)          
       .collect()
if word in myrdd2:
    mylist2.append(word)

然后我多次重复此模式。

有没有办法进行操作

if word in myrdd: 
    do something

不先收集RDD?

是否有诸如rdd.contains()?

之类的函数

P.S:我没有在记忆中缓存任何东西。我的火花背景看起来像这样:

jobName = "wordcount"
sc = SparkContext(appName = jobName)
......
......
sc.stop()

YARN的错误消息说collect不是问题,因为您的执行者(而不是驱动程序)有内存问题。

首先,尝试遵循错误消息建议并提升spark.yarn.executor.memoryOverhead-在纱线上运行pyspark时,您可以告诉纱线为Python工人分配一个更大的容器来处理内存。

接下来,查看执行者需要大量内存的操作。您使用reduceByKey,也许您可以增加分区数,以使它们在所用的内存方面较小。查看numPartitions参数:http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.rddd.rdd.rdd.rdd.rducebykey

最后,如果要检查RDD是否包含一些值,则只需通过此值过滤并使用countfirst检查它,例如:

looking_for = "....."
contains = rdd.filter(lambda a: a == looking_for).count() > 0

相关内容

最新更新