我对Spark还很陌生,但我已经能够创建Spark应用程序了。我需要能够使用JDBC驱动程序重新处理SQL Server中的数据(我们正在删除昂贵的SP),该应用程序通过JDBC将SQL Server的一些表加载到数据帧中,然后我进行一些联接、一个组和一个过滤器,最后通过JDBC将一些数据重新插入到另一个表中。所有这些在亚马逊Web服务中的Spark EMR上执行得很好,大约一分钟内就可以在一个有两个核心的m3.xlarg中执行。
我的问题如下:1.现在我在集群上有1个主机和2个核心,但每次我启动一个新步骤时,从历史服务器上可以看到,似乎只有1个执行器在使用,因为我可以看到列出了2个执行器,驱动程序根本没有使用,一个id为1的执行器处理大约1410个任务。我完全不确定该怎么办。
这也是AWS特有的,但我不想发布两个问题,因为它们在某种程度上是相关的,有没有办法让我同时运行两个步骤?这意味着能够同时运行该进程的两个spark提交,因为我们每天运行该进程很多次(它处理客户端数据)。我知道我可以用这个步骤启动一个新的集群,但我希望能够快速完成处理,而仅仅启动新集群就需要太长时间。谢谢
对于您的第一个问题:
我不确定情况是否如此,但类似的事情也发生在我们身上,也许这会有所帮助。
如果使用sqlContext.read.format("jdbc").load()
(或类似方法)从JDBC源读取数据,则默认情况下不会对生成的数据帧进行分区。因此,如果你是这样的话,在不首先对其进行分区的情况下对生成的数据帧应用转换将导致只有一个执行器能够处理它。如果你不是这样,下面的解决方案可能无法解决你的问题。
因此,我们的解决方案是在数据中创建一个数值为1到32(我们想要的分区数)的数字列,并通过设置jdbc读取器的分区选项将其用作分区列(请查看此链接):
val connectionOptions = Map[String, String] (... <connection options> ...)
val options = connectionOptions ++ Map[String, String] (
"partitionColumn" -> "column name",
"lowerBound" -> "1",
"upperBound" -> "32",
"numPartitions" -> "32"
)
val df = sqlContext.read.format("jdbc").options(options).load()
因此,使用这种方法,不仅可以并行处理读取任务(真正提高性能并避免OOM错误),而且可以对生成的数据帧进行分区并并行处理,以进行所有后续转换。
我希望这能有所帮助。