如何在spark中触发2个写操作



我基本上有2个保存动作在我的数据框架上执行。工作进行得很顺利。但是当我在Spark UI中看到事件时间轴时,我明白了首先动作1完成,然后动作2开始并完成。

由于这两个动作是相互独立的,是否有办法将它们一起执行。下面是我的代码。

processedDF.write.format("ORC").options(Map("path" ->
integrationFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "."
+ hiveTableName + "_int")
errorDF.write.format("ORC").options(Map("path" ->
errorFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." +
hiveTableName + "_error")

我想同时处理"processDF"one_answers"errorDF"写入HDFS。

你可以在不同的线程中启动它们:

   new Thread() {
      override def run(): Unit = {
       processedDF.write.format("ORC").options(Map("path" ->
integrationFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "."
+ hiveTableName + "_int")
      }
    }.start()

    new Thread() {
          override def run(): Unit = {
           errorDF.write.format("ORC").options(Map("path" ->
errorFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." +
hiveTableName + "_error")
          }
        }.start()

请参考这里的调度文档。逐字复制相关部分:

在给定的Spark应用程序(SparkContext实例)中,如果多个并行作业从单独的线程提交,则可以同时运行。默认情况下,Spark的调度程序以FIFO的方式运行作业。每个作业被划分为"阶段"(例如map和reduce阶段),第一个作业在所有可用资源上获得优先级,而它的阶段有任务要启动,然后第二个作业获得优先级,等等。从Spark 0.8开始,也可以在作业之间配置公平共享。在公平共享下,Spark以"循环"的方式在作业之间分配任务,这样所有作业都可以获得大致相等的集群资源份额。这意味着在运行长作业时提交的短作业可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长作业完成。

要启用公平调度程序,只需在配置SparkContext时将spark.scheduler.mode属性设置为fair:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

如果有人还在寻找一个解决方案,我做了同样的使用未来

@volatile var terminated = false
    import scala.concurrent._
    import ExecutionContext.Implicits.global
    
    val f1= Future{processedDF.write.format("ORC").options(Map("path" ->
integrationFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "."
+ hiveTableName + "_int")}
    val f2= Future{errorDF.write.format("ORC").options(Map("path" ->
errorFullPath)).mode(SaveMode.Overwrite).saveAsTable(HIVE_SCHEMA + "." +
hiveTableName + "_error")}
    val aggregatedFuture = for{
      f1Result <- f1
      f2Result <- f2
    } yield (f1Result, f2Result)
    Thread.sleep(500) 
    terminated = true
    val res = Await.result(aggregatedFuture, Duration.Inf)

相关内容

  • 没有找到相关文章

最新更新