Spark Container & Executor OOMs in 'reduceByKey'



我正在使用 YARN 在 Amazon 的 EMR 上运行一个 Spark 作业,使用 pyspark,处理来自两个大小的输入文件(总计 200 GB)的数据。

该作业将数据连接在一起(使用reduceByKey),执行一些地图和过滤器,并以 Parquet 格式将其保存到 S3。虽然作业使用数据帧进行保存,但我们所有实际的转换和操作都是在 RDD 上执行的。

请注意,我已经在"失败"部分之后详细介绍了我当前的配置和值,我已经尝试过这些配置和值。

法典

与我们看到的故障相关的代码发生在reduceByKey步骤中。我包含了几行上下文来显示一个先前的映射函数和实际触发RDD上reduceByKey的保存操作:

# Populate UC Property Type
united_rdd = united_rdd.map(converter.convert_uc_property_type(uc_property_type_mappings))
# Reduce by listingIdSha
united_rdd = united_rdd.reduceByKey(converter.merge_listings)
# Filter by each geoId and write the output to storage
schema = convert_struct(ListingRevision)
for geo in GEO_NORMALIZATION_ENABLED_GEOS:
regional_rdd = (united_rdd.filter(lambda (id_sha, (listing_revision, geo_id)): geo_id == geo)
.map(lambda (id_sha, (listing_revision, geo_id)):
listing_revision))
regional_df = regional_rdd.map(lambda obj: thrift_to_row(obj, schema)).toDF(schema)
# Write to Disk/S3
regional_df.write.format(output_format).mode("overwrite").save(os.path.join(output_dir, geo))
# Write to Mongo
(regional_df.write.format("com.mongodb.spark.sql.DefaultSource")
.option("spark.mongodb.output.uri", mongo_uri)
.option("collection",
"{}_{}".format(geo, config.MONGO_OUTPUT_COLLECTION_SUFFIX))
.mode("overwrite").save())

失败

由于执行程序物理内存不足,所述作业失败。多个执行程序遇到此故障,但下面是 EMR 步骤的 stderr 中打印并在 Spark 历史记录服务器 UI 中显示的一个示例:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2787 in stage 3.0 failed 4 times,
most recent failure: Lost task 2787.3 in stage 3.0 (TID 5792, ip-10-0-10-197.ec2.internal): 
ExecutorLostFailure (executor 47 exited caused by one of the running tasks) 
Reason: Container killed by YARN for exceeding memory limits. 20.0 GB of 20 GB physical memory used. 
Consider boosting spark.yarn.executor.memoryOverhead.
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1923)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelationCommand.scala:143)
... 29 more

发现这一点后,我深入研究了单个节点的 YARN 和容器日志,并在容器日志中发现了带有物理内存使用峰值和java.lang.OutOfMemory异常的 YARN 日志消息(按下述顺序包含)。

容器日志中的 JavaOutOfMemory错误:

17/03/28 21:41:44 WARN TransportChannelHandler: Exception in connection from ip-10-0-10-70.ec2.internal/10.0.10.70:7337
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:693)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:645)
at io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:228)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:212)
at io.netty.buffer.PoolArena.allocate(PoolArena.java:132)
at io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:271)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:155)
at io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:146)
at io.netty.buffer.AbstractByteBufAllocator.ioBuffer(AbstractByteBufAllocator.java:107)
at io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl.allocate(AdaptiveRecvByteBufAllocator.java:104)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:117)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)

YARN对极端物理内存使用情况的识别:

2017-03-28 21:42:48,986 INFO   org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl (Container Monitor): Memory usage of ProcessTree 6310 for container-id container_1490736006967_0001_01_000015: 20.3 GB of 20 GB physical memory used; 24.9 GB of 100 GB virtual memory used

总之,尽管将执行程序的一半以上的内存分配给堆外空间并尝试了各种执行程序内存设置和内核,但我似乎在随机播放期间内存不足。我错过了其他可以尝试的东西吗?根据我读过的其他一些有用的文章(例如),这些是物理内存问题的最常见罪魁祸首。数据倾斜是否可能导致这种情况?我已经尝试测量较小数据子集的分区分布,它看起来很正常,但是无法对这项工作的所有数据执行此操作,因为它从未完成。

配置

EMR 火花提交命令:

spark-submit 
--deploy-mode client /home/hadoop/src/python/uc_spark/data_sources/realtytrac/assr_hist_extractor.py 
--dataset_settings development 
--mongo_uri <Internal Mongo URI> 
--aws_access_key_id <S3 Access Key> 
--aws_secret_key <S3 Secret Key> 
--listing_normalization_server <Internal EC2 Address>:9502

相关 Spark 环境配置:spark.executor.memory- 8 GB(每个执行程序的可用内存为 20 GB。spark.yarn.executor.memoryOverhead- 12 GBspark.executor.cores- 1(我尝试过的最低值,希望它能起作用。spark.default.parallelism- 1024(根据其他参数自动配置。我试过4099无济于事。

我正在运行 64 台m3.2xlarge计算机,总计 1.41 TB 内存。

注意:我已经尝试了所有内存参数的广泛值,除了驱动程序内存没有运气。

更新 1

我重构了我的代码,以使用Dataframe的join而不是RDD的union来组合两个输入文件。一旦我这样做了,我就有了两个重要的发现:

与默认的leftOuter联接相反,rightOuter联接会减小输出大小,但可以解决问题。鉴于此,我相当确定我们有一小部分偏斜数据被rightOuter联接排除。不幸的是,我需要做更多的调查,看看丢失的数据是否重要;我们仍在探索中。

使用数据帧在此过程的早期导致了更清晰的故障:

FetchFailed(BlockManagerId(80, ip-10-0-10-145.ec2.internal, 7337), shuffleId=2, mapId=35, reduceId=435, message=
org.apache.spark.shuffle.FetchFailedException: Too large frame: 3095111448
at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:357)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:332)
at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:54)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:117)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:112)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:504)
at org.apache.spark.api.python.PythonRunner$WriterThread$$anonfun$run$3.apply(PythonRDD.scala:328)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)
at org.apache.spark.api.python.PythonRunner$WriterThread.run(PythonRDD.scala:269)
Caused by: java.lang.IllegalArgumentException: Too large frame: 3095111448
at org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
at org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
at java.lang.Thread.run(Thread.java:745)
)

由于单个分区获取太多数据(3 GB"帧"),我们在随机播放期间失败。

我将在一天的剩余时间里探讨如何对数据进行反扭曲,以及我们是否可以执行leftOuter联接。

万一有人发现这一点,问题原来是由数据倾斜引起的。我通过将两个输入文件的初始组合切换为使用数据帧联接而不是 RDD 联合来发现这一点。这导致了一个更容易理解的错误,表明我们的随机播放在尝试检索数据时失败。为了解决这个问题,我围绕一个均匀分布的键对数据进行了分区,然后一切正常。

最新更新