我有一个任务可以从更多内核中受益,但独立调度程序会在只有一个子集可用时启动它。我宁愿在此任务中使用所有群集核心。
有没有办法告诉调度程序在将资源分配给任务之前完成所有工作?换句话说,如果 DAG 在执行任务之前结束所有路径或等到有更多内核可用,则 DAG 更适合此作业。也许是一种暗示任务很胖的方法?我不是也不想经营纱线。
简洁地说:我需要在其他空闲的集群上运行此映射任务,以便它拥有所有资源/核心。有什么办法可以做到这一点吗?即使是一个笨拙的答案也会不胜感激。
有什么想法吗?
动态资源分配可能是您正在寻找的。它根据工作负载上下扩展注册到此应用程序的执行程序的数量。
您可以通过将配置参数传递给 SparkSession 来启用它,例如:
val spark = SparkSession
.builder()
.appName("MyApp")
.config("spark.dynamicAllocation.enabled","true")
.config("spark.shuffle.service.enabled","true")
.getOrCreate()
看到这个: http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation了解更多详情。
您必须通过 REST API 手动检查 YARN,以查看何时没有应用程序运行。
GET http://<rm http address:port>/ws/v1/cluster/metrics
{
"clusterMetrics":
{
"appsSubmitted":0,
"appsCompleted":0,
"appsPending":0,
"appsRunning":0,
"appsFailed":0,
"appsKilled":0,
"reservedMB":0,
"availableMB":17408,
"allocatedMB":0,
"reservedVirtualCores":0,
"availableVirtualCores":7,
"allocatedVirtualCores":1,
"containersAllocated":0,
"containersReserved":0,
"containersPending":0,
"totalMB":17408,
"totalVirtualCores":8,
"totalNodes":1,
"lostNodes":0,
"unhealthyNodes":0,
"decommissionedNodes":0,
"rebootedNodes":0,
"activeNodes":1
}
}
如果没有挂起或正在运行的应用程序,则可以运行脚本。 我只会创建一个处于 while 循环 + 睡眠中的 shell 脚本,并等待它们都为 0。
您也可以查找可用的内存/内核。 事实上,我会走这条路,这样你就不会总是在等待,你只是保证足够的资源。