我正在Spark(v2+(中寻找一种可靠的方法,以编程方式调整会话中执行器的数量。
我知道动态分配和在创建会话时配置spark执行器的能力(例如使用--num-executors
(,但由于我的spark作业的性质,这两个选项对我都不太有用。
我的星火工作
该作业对大量数据执行以下步骤:
- 对数据执行一些聚合/检查
- 将数据加载到Elasticsearch中(ES集群通常比Spark集群小得多(
问题
- 如果我使用全套可用的Spark资源,我会非常快速过载Elasticsearch,甚至可能颠覆Elasticsearch节点
- 如果我使用数量足够少的火花执行器,以免不堪重负Elasticsearch,步骤1花费的时间比需要的时间长很多(因为它有可用火花资源的小%(
我很感激我可以将此作业拆分为两个作业,这两个作业使用不同的Spark资源配置文件分别执行,但我真正想要的是在我的Spark脚本中的特定点(在Elasticsearch加载开始之前(以编程方式将执行器的数量设置为X。一般来说,这似乎是一件有用的事情。
我的初次尝试
我试着改变设置,发现了一些有点有效的东西,但这感觉像是一种黑客的方式,应该以更标准化和更受支持的方式来做一些事情。
我的尝试(这只是我在玩(:
def getExecutors = spark.sparkContext.getExecutorStorageStatus.toSeq.map(_.blockManagerId).collect {
case bm if !bm.isDriver => bm
}
def reduceExecutors(totalNumber: Int): Unit = {
//TODO throw error if totalNumber is more than current
logger.info(s"""Attempting to reduce number of executors to $totalNumber""")
spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
val killedExecutors = scala.collection.mutable.ListBuffer[String]()
while (getExecutors.size > totalNumber) {
val executorIds = getExecutors.map(_.executorId).filterNot(killedExecutors.contains(_))
val executorsToKill = Random.shuffle(executorIds).take(executorIds.size - totalNumber)
spark.sparkContext.killExecutors(executorsToKill)
killedExecutors ++= executorsToKill
Thread.sleep(1000)
}
}
def increaseExecutors(totalNumber: Int): Unit = {
//TODO throw error if totalNumber is less than current
logger.info(s"""Attempting to increase number of executors to $totalNumber""")
spark.sparkContext.requestTotalExecutors(totalNumber, 0, Map.empty)
while (getExecutors.size < totalNumber) {
Thread.sleep(1000)
}
}
您可以尝试调用
val dfForES = df.coalesce(numberOfParallelElasticSearchUploads)
在步骤#2之前。这将减少分区的数量,而不会产生混洗开销,并确保只有max-numberOfParallelElasticSearchUploads执行器并行向ES发送数据,而其他执行器处于空闲状态。
如果您在共享集群上运行作业,我仍然建议启用动态分配来释放这些空闲的执行器,以获得更好的资源利用率。
我一直在寻找一种通过编程调整pyspark中执行器数量的方法,这是最好的结果。以下是我从Will的问题和py4j:中收集到的信息
# Create the spark session:
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(... your configs ...).getOrCreate()
# Increase cluster to 5 executors:
spark._jsparkSession.sparkContext().requestTotalExecutors(5, 0, sc._jvm.PythonUtils.toScalaMap({}))
# Decrease cluster back to zero executors:
spark._jsparkSession.sparkContext().requestTotalExecutors(0, 0, sc._jvm.PythonUtils.toScalaMap({}))
javaExecutorIds = spark._jsparkSession.sparkContext().getExecutorIds()
executorIds = [javaExecutorIds.apply(i) for i in range(javaExecutorIds.length())]
print(f'Killing executors {executorIds}')
spark._jsparkSession.sparkContext().killExecutors(javaExecutorIds)
我希望这能让其他人免于过度的谷歌搜索。