我正在以以下方式提交多个火花作业 -
someCollection.foreach(m => {
..some code
sparkSubmitClass.run(m.name)
.. some code
})
其中sparksubmitclass.run((方法基本上使用具有$ spark_home/bin/spark-submit和其他相关参数的shell脚本。
问题在于,该代码一口气提交了所有Spark作业。我要实现的是 - 提交工作,然后仅在较早的工作完成时提交另一个工作。这是因为订购了系统策略,下一个作业取决于上一个作业创建的数据。
sparksubmitClass.run((在以下行 -
def run(appName: String)(implicit executionContext: ExecutionContext) = {
val command = s"sparkJob.sh $appName"
val processBuilder = Process(command)
val pio = new ProcessIO(_ => (),
stdout => {
scala.io.Source.fromInputStream(stdout)
.getLines.foreach(str => log.info(s"spark-submit: Application
Name=$appName stdout='${str.replace("'", "\'")}'"))
},
stderr => {
val lines = scala.io.Source.fromInputStream(stderr).getLines().toBuffer
lines.foreach(str => log.info(s"spark-submit: Application Name=$appName
stderr='${str.replace("'", "\'")}'"))
lines.flatMap(parseLineForApplicationUrl).headOption.foreach(appId =>
appId)
})
val process = processBuilder.run(pio)
val exitVal = process.exitValue() //returns 0 as soon as application is
submitted
}
和sparkjob.sh基本上是 -
MAIN_CLASS="com.SomeClassHavingRDDAndHiveOperations"
APPNAME=$1
JAVA_OPTS="-XX:MaxDirectMemorySize=$WORKER_DIRECT_MEM_SIZE -
XX:+HeapDumpOnOutOfMemoryError -Djava.net.preferIPv4Stack=true"
SPARK_HOME="/usr/lib/spark"
cmd='$SPARK_HOME/bin/spark-submit --class $MAIN_CLASS
--name ${APPNAME}
--conf "spark.yarn.submit.waitAppCompletion=false"
--conf "spark.io.compression.codec=snappy"
--conf "spark.kryo.unsafe=true"
--conf "spark.kryoserializer.buffer.max=1024m"
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
--driver-java-options "-XX:MaxMetaspaceSize=$WORKER_PERM_SIZE $JAVA_OPTS"
$appdir/SomeJar.jar $APPNAME'
eval $cmd
关于如何建立这种订购的任何想法?
而不是编写 bash scripts
,并调用每个作业和浪费io/read-write阶段,为什么不循环代码内的作业订购。
这是您的一些提示:
首先,您必须确保您拥有interface
,并将该interface
实现到每个要处理的class
,以便您可以有一个通用的method
来开始每个作业。(在此示例中然后,您需要在一个文件中以所需的顺序写入所有class-names-with-package
。假设文件是orderedJobs
(您不需要提及扩展(
package1.Class1
package1.Class2
package2.Class3
....
阅读并解析该文件。我假设它位于Resouces文件夹中,您可以过滤您不需要的行
val classCall = Source.fromInputStream(getClass.getResourceAsStream(<locationOforderedJobs>)).getLines().filter(!_.startsWith("#"))
用foreach
循环每个班级,并调用定义的通用方法(process
(
classCall.foreach(job => {
processJob(job).process(<you can pass arguments>)
}
processJob
是funtion
,您可以在其中实例化每个类
def processJob(name: String): JobInterface = {
val action = Class.forName("<package path from source root>"+className).newInstance()
action.asInstanceOf[JobInterface]
}
这样,您可以减少IO/READ-WRITE时间浪费,通过在内存中存储有用的其他作业数据来提高SPARK处理效率,减少处理时间以及更多...
我希望它有帮助