在我的Spark应用程序中,我试图从RDBMS读取多个表,做一些数据处理,然后将多个表写入另一个RDBMS,如下所示(在Scala中):
val reading1 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable1))
val reading2 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable2))
val reading3 = sqlContext.load("jdbc", Map("url" -> myurl1, "dbtable" -> mytable3))
// data processing
// ..............
myDF1.write.mode("append").jdbc(myurl2, outtable1, new java.util.Properties)
myDF2.write.mode("append").jdbc(myurl2, outtable2, new java.util.Properties)
myDF3.write.mode("append").jdbc(myurl2, outtable3, new java.util.Properties)
我明白从一个表中读取可以使用分区并行。然而,reading1、reading2、reading3的读操作似乎是顺序的,myDF1、myDF2、myDF3的写操作也是顺序的。
我如何从多个表(mytable1, mytable2, mytable3)并行读取?也写多个表并行(我认为相同的逻辑)?
您可以将模式调度为FAIR,它应该并行运行任务。https://spark.apache.org/docs/latest/job-scheduling.html scheduling-within-an-application
在应用程序内调度在给定的Spark应用程序(SparkContext实例)中,如果多个并行作业是从单独的线程提交的,它们可以同时运行。在本节中,我们所说的"job"是指Spark操作(例如保存、收集)和任何需要运行以评估该操作的任务。Spark的调度程序是完全线程安全的,并支持这个用例,使应用程序能够服务多个请求(例如多个用户的查询)。
默认情况下,Spark的调度程序以FIFO的方式运行作业。每个作业被划分为"阶段"(例如map和reduce阶段),第一个作业在所有可用资源上获得优先级,而其阶段有任务要启动,然后第二个作业获得优先级,依此类推。如果队列头部的作业不需要使用整个集群,那么后面的作业可以立即开始运行,但是如果队列头部的作业很大,那么后面的作业可能会明显延迟。
从Spark 0.8开始,也可以在作业之间配置公平共享。在公平共享下,Spark以"循环"的方式在作业之间分配任务,这样所有作业都可以获得大致相等的集群资源份额。这意味着在运行长作业时提交的短作业可以立即开始接收资源,并且仍然可以获得良好的响应时间,而无需等待长作业完成。此模式最适合多用户设置。
val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)