我有一个4节点的Spark Standalone集群,上面运行着一个Spark流作业。
当我提交每个执行器有7个核心的作业时,一切都运行顺利:
spark-submit --class com.test.StreamingJob --supervise --master spark://{SPARK_MASTER_IP}:7077 --executor-memory 30G --executor-cores 7 --total-executor-cores 28 /path/to/jar/spark-job.jar
当我将每个执行器增加到24个内核时,没有一个批得到处理,并且我看到java.lang.OutOfMemoryError:无法在执行器日志中创建新的本机线程。执行者不断失败:
spark-submit --class com.test.StreamingJob --supervise --master spark://{SPARK_MASTER_IP}:7077 --executor-memory 30G --executor-cores 24 --total-executor-cores 96 /path/to/jar/spark-job.jar
错误:
17/01/12 16:01:00 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Shutdown-checker,5,main]
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:714)
at io.netty.util.concurrent.SingleThreadEventExecutor.shutdownGracefully(SingleThreadEventExecutor.java:534)
at io.netty.util.concurrent.MultithreadEventExecutorGroup.shutdownGracefully(MultithreadEventExecutorGroup.java:146)
at io.netty.util.concurrent.AbstractEventExecutorGroup.shutdownGracefully(AbstractEventExecutorGroup.java:69)
at com.datastax.driver.core.NettyOptions.onClusterClose(NettyOptions.java:190)
at com.datastax.driver.core.Connection$Factory.shutdown(Connection.java:844)
at com.datastax.driver.core.Cluster$Manager$ClusterCloseFuture$1.run(Cluster.java:2488)
我发现了这个问题,并试图大幅提高ulimits,但没有效果。
每个盒子有32个核心和61.8 GB的内存。流式作业是用java编写的,运行在Spark 2.0.0上,使用Spark-Cassandra connector-java_2.10 1.5.0-M2连接到Cassandra 3.7.0。
数据是每秒不到100个事件的非常小的涓涓细流,每个事件都不到200B。
听起来像是内存不足;)。
更详细地说,Spark使用的内核数量与并行处理的信息量直接相关。您基本上可以将每个Core视为处理完整的Spark分区的数据,并且可能需要将完整的数据驻留在内存中。
每个执行器7个内核意味着同时处理7个Spark分区。将这个数字提高到24意味着将使用大约4倍的ram。这很容易在不同的地方造成OOM。
有几种方法可以解决这个问题。
- 为执行器JVM分配更多内存
- 缩小Spark分区的大小(更小的分区意味着在任何给定时间内存中的数据都更少)
- 确保您没有在内存中缓存任何RDD(从而耗尽系统资源)
- 减少您正在处理的数据量,获取子集,或者在启动之前尝试在服务器上进行筛选