如何按某些顺序处理多个纱线作业



我正在以以下方式提交多个火花作业 -

someCollection.foreach(m => {
    ..some code
    sparkSubmitClass.run(m.name)
    .. some code
})

其中sparksubmitclass.run((方法基本上使用具有$ spark_home/bin/spark-submit和其他相关参数的shell脚本。

问题在于,该代码一口气提交了所有Spark作业。我要实现的是 - 提交工作,然后仅在较早的工作完成时提交另一个工作。这是因为订购了系统策略,下一个作业取决于上一个作业创建的数据。

sparksubmitClass.run((在以下行 -

def run(appName: String)(implicit executionContext: ExecutionContext) = {
val command = s"sparkJob.sh $appName"
val processBuilder = Process(command)
val pio = new ProcessIO(_ => (),
  stdout => {
    scala.io.Source.fromInputStream(stdout)
      .getLines.foreach(str => log.info(s"spark-submit: Application 
       Name=$appName stdout='${str.replace("'", "\'")}'"))
  },
  stderr => {
    val lines = scala.io.Source.fromInputStream(stderr).getLines().toBuffer
    lines.foreach(str => log.info(s"spark-submit: Application Name=$appName 
        stderr='${str.replace("'", "\'")}'"))
    lines.flatMap(parseLineForApplicationUrl).headOption.foreach(appId => 
     appId)
  })
  val process = processBuilder.run(pio)
  val exitVal = process.exitValue() //returns 0 as soon as application is 
  submitted
}

和sparkjob.sh基本上是 -

MAIN_CLASS="com.SomeClassHavingRDDAndHiveOperations"
APPNAME=$1
JAVA_OPTS="-XX:MaxDirectMemorySize=$WORKER_DIRECT_MEM_SIZE -
XX:+HeapDumpOnOutOfMemoryError -Djava.net.preferIPv4Stack=true"
SPARK_HOME="/usr/lib/spark"
cmd='$SPARK_HOME/bin/spark-submit --class $MAIN_CLASS 
--name ${APPNAME}
--conf "spark.yarn.submit.waitAppCompletion=false"
--conf "spark.io.compression.codec=snappy"
--conf "spark.kryo.unsafe=true"
--conf "spark.kryoserializer.buffer.max=1024m"
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"
--driver-java-options "-XX:MaxMetaspaceSize=$WORKER_PERM_SIZE $JAVA_OPTS"
$appdir/SomeJar.jar $APPNAME'
eval $cmd

关于如何建立这种订购的任何想法?

而不是编写 bash scripts,并调用每个作业和浪费io/read-write阶段,为什么不循环代码内的作业订购。
这是您的一些提示:
首先,您必须确保您拥有interface,并将该interface实现到每个要处理的class,以便您可以有一个通用的method来开始每个作业。(在此示例中然后,您需要在一个文件中以所需的顺序写入所有class-names-with-package。假设文件是orderedJobs(您不需要提及扩展(

package1.Class1
package1.Class2
package2.Class3
....

阅读并解析该文件。我假设它位于Resouces文件夹中,您可以过滤您不需要的行

val classCall = Source.fromInputStream(getClass.getResourceAsStream(<locationOforderedJobs>)).getLines().filter(!_.startsWith("#"))

foreach循环每个班级,并调用定义的通用方法(process(

classCall.foreach(job => {
    processJob(job).process(<you can pass arguments>)
}

processJobfuntion,您可以在其中实例化每个类

def processJob(name: String): JobInterface = {
    val action = Class.forName("<package path from source root>"+className).newInstance()
    action.asInstanceOf[JobInterface]
  }

这样,您可以减少IO/READ-WRITE时间浪费,通过在内存中存储有用的其他作业数据来提高SPARK处理效率,减少处理时间以及更多...
我希望它有帮助

相关内容

  • 没有找到相关文章

最新更新