我有一个Spark流作业,从Kafka读取并保存到Redshift。
批RDD包含列"groupid"的数据,但是下面的代码不是并发运行forEach,而是在Yarn客户端模式下串行运行。
纱环境:
- 客户端模式 默认调度程序FIFO
- executor Instances 3+
- Executor: 2 Cores, 2gb
inputDstream.foreachRDD { eventRdd: RDD[Event] =>
...
// Convert eventRdd to eventDF
val groupIds = eventDF.select("group_id").distinct.collect.flatMap(_.toSeq)
groupIds.par.foreach{ groupId =>
val teventDF = eventDF.where($"group_id" <=> groupId)
val teventDFWithVersion = teventDF.withColumn("schema_id", lit(version))
teventDFWithVersion.write
.format("io.github.spark_redshift_community.spark.redshift")
.options(opts)
.mode("Append")
.save()
}
}
同样,groupsid .par.foreach中的操作是串行运行的,而不是并行运行的。随着组的增加,我的应用程序开始阻塞,处理时间激增。
如何让Spark同时保存我的批量数据?
驱动程序在m5上运行。大(2个CPU),但只有1个CPU用于驱动程序应用程序,因为其他服务正在占用另一个CPU。
array.par。foreach{}根据可用vcpu的数量并发执行。
运行更多CPU的驱动程序允许更多并发写。
解决方法:在CPU较多的机器上以Client模式运行Spark Driver应用程序。或者使用——driver-cores命令以集群模式运行Spark Application。