Spark Streaming应用程序没有并发运行作业



我有一个Spark流作业,从Kafka读取并保存到Redshift。

批RDD包含列"groupid"的数据,但是下面的代码不是并发运行forEach,而是在Yarn客户端模式下串行运行。

纱环境:

  • 客户端模式
  • 默认调度程序FIFO
  • executor Instances 3+
  • Executor: 2 Cores, 2gb
inputDstream.foreachRDD { eventRdd: RDD[Event] =>
...
// Convert eventRdd to eventDF
val groupIds = eventDF.select("group_id").distinct.collect.flatMap(_.toSeq)
groupIds.par.foreach{ groupId =>
val teventDF = eventDF.where($"group_id" <=> groupId)
val teventDFWithVersion = teventDF.withColumn("schema_id", lit(version))
teventDFWithVersion.write
.format("io.github.spark_redshift_community.spark.redshift")
.options(opts)
.mode("Append")
.save()
}
}
同样,groupsid .par.foreach中的操作是串行运行的,而不是并行运行的。随着组的增加,我的应用程序开始阻塞,处理时间激增。

如何让Spark同时保存我的批量数据?

驱动程序在m5上运行。大(2个CPU),但只有1个CPU用于驱动程序应用程序,因为其他服务正在占用另一个CPU。

array.par。foreach{}根据可用vcpu的数量并发执行。

运行更多CPU的驱动程序允许更多并发写。

解决方法:在CPU较多的机器上以Client模式运行Spark Driver应用程序。或者使用——driver-cores命令以集群模式运行Spark Application。