我期待着在通过Yarn提交任何失败的情况下自动重新启动Spark作业的配置或参数。我知道任务会在失败时自动重新启动。我非常期待能触发重新运行整个作业的YARN或Spark配置。
现在,如果我们的任何作业因任何问题而中止,我们必须手动重新启动它,这会导致处理长数据队列,因为这些数据队列是为近实时工作而设计的。
当前配置:
#!/bin/bash
export SPARK_MAJOR_VERSION=2
# Minimum TODOs on a per job basis:
# 1. define name, application jar path, main class, queue and log4j-yarn.properties path
# 2. remove properties not applicable to your Spark version (Spark 1.x vs. Spark 2.x)
# 3. tweak num_executors, executor_memory (+ overhead), and backpressure settings
# the two most important settings:
num_executors=6
executor_memory=32g
# 3-5 cores per executor is a good default balancing HDFS client throughput vs. JVM overhead
# see http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/
executor_cores=2
# backpressure
reciever_minRate=1
receiver_max_rate=10
receiver_initial_rate=10
/usr/hdp/2.6.1.0-129/spark2/bin/spark-submit --master yarn --deploy-mode cluster
--name br1_warid_ccn_sms_production
--class com.spark.main
--driver-memory 16g
--num-executors ${num_executors} --executor-cores ${executor_cores} --executor-memory ${executor_memory}
--queue default
--files log4j-yarn-warid-br1-ccn-sms.properties
--conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=log4j-yarn-warid-br1-ccn-sms.properties"
--conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=log4j-yarn-warid-br1-ccn-sms.properties"
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer `# Kryo Serializer is much faster than the default Java Serializer`
--conf spark.kryoserializer.buffer.max=1g
--conf spark.locality.wait=30
--conf spark.task.maxFailures=8 `# Increase max task failures before failing job (Default: 4)`
--conf spark.ui.killEnabled=true `# Prevent killing of stages and corresponding jobs from the Spark UI`
--conf spark.logConf=true `# Log Spark Configuration in driver log for troubleshooting`
`# SPARK STREAMING CONFIGURATION`
--conf spark.scheduler.mode=FAIR
--conf spark.default.parallelism=32
--conf spark.streaming.blockInterval=200 `# [Optional] Tweak to balance data processing parallelism vs. task scheduling overhead (Default: 200ms)`
--conf spark.streaming.receiver.writeAheadLog.enable=true `# Prevent data loss on driver recovery`
--conf spark.streaming.backpressure.enabled=false
--conf spark.streaming.kafka.maxRatePerPartition=${receiver_max_rate} `# [Spark 1.x]: Corresponding max rate setting for Direct Kafka Streaming (Default: not set)`
`# YARN CONFIGURATION`
--conf spark.yarn.driver.memoryOverhead=4096 `# [Optional] Set if --driver-memory < 5GB`
--conf spark.yarn.executor.memoryOverhead=4096 `# [Optional] Set if --executor-memory < 10GB`
--conf spark.yarn.maxAppAttempts=4 `# Increase max application master attempts (needs to be <= yarn.resourcemanager.am.max-attempts in YARN, which defaults to 2) (Default: yarn.resourcemanager.am.max-attempts)`
--conf spark.yarn.am.attemptFailuresValidityInterval=1h `# Attempt counter considers only the last hour (Default: (none))`
--conf spark.yarn.max.executor.failures=$((8 * ${num_executors})) `# Increase max executor failures (Default: max(numExecutors * 2, 3))`
--conf spark.yarn.executor.failuresValidityInterval=1h `# Executor failure counter considers only the last hour`
--conf spark.task.maxFailures=8
--conf spark.speculation=false
/home//runscripts/production.jar
注意:主题领域有几个问题,但他们没有接受的答案,或者答案偏离了预期的解决方案。在YARN上运行Spark应用程序,无需提交Spark如何在Yarn 上配置应用程序驱动程序的自动重启
这个问题从YARN和Spark的角度探讨了可能的解决方案。
只是一个想法!
让我们将脚本文件(包含上述脚本)称为run_spark_job.sh
。
尝试在脚本末尾添加以下语句:
return_code=$?
if [[ ${return_code} -ne 0 ]]; then
echo "Job failed"
exit ${return_code}
fi
echo "Job succeeded"
exit 0
让我们有另一个脚本文件spark_job_runner.sh
,我们从中调用上面的脚本。例如,
./run_spark_job.sh
while [ $? -ne 0 ]; do
./run_spark_job.sh
done
基于YARN的方法:更新1:此链接将是一个很好的阅读。它讨论了YARN REST API提交和跟踪:https://community.hortonworks.com/articles/28070/starting-spark-jobs-directly-via-yarn-rest-api.html
更新2:此链接显示如何使用Java将spark应用程序提交到YARN环境:https://github.com/mahmoudparsian/data-algorithms-book/blob/master/misc/how-to-submit-spark-job-to-yarn-from-java-code.md
基于Spark的编程方法:
如何使用程序火花提交功能
YARN基于Spark的配置方法:
YARN模式下用于重新启动的only火花参数为spark.yarn.maxAppAttempts
,并且不应超过YARN资源管理器参数yarn.resourcemanager.am.max-attempts
官方文件摘录https://spark.apache.org/docs/latest/running-on-yarn.html
提交应用
在纱线模式中,您可以设置yarn.resourcemanager.am.max尝试(默认为2)重新运行失败的作业,您可以根据需要增加时间。或者你们也可以使用spark的spark.yarn.maxAppAttempts配置。