在不启动新集群的情况下从Airflow触发Databricks作业



我正在使用气流来触发数据块上的作业。我有很多DAG在运行数据块作业,我不得不只使用一个集群,而不是多个集群,因为据我所知,这将降低这些任务产生的成本。

使用DatabricksSubmitRunOperator有两种方法可以在数据块上运行作业。要么使用正在运行的集群,通过id 调用它

'existing_cluster_id' : '1234-567890-word123',

或者启动一个新的集群

'new_cluster': {
'spark_version': '2.1.0-db3-scala2.11',
'num_workers': 2
},

现在,我想尽量避免为每个任务启动一个新的集群,但集群在停机期间会关闭,因此它将不再通过其id可用,我会得到一个错误,所以我认为唯一的选择是创建一个新集群。

1) 有没有一种方法可以让集群在关闭时也能被id调用?

2) 人们只是保持集群的活力吗?

3) 还是我完全错了,为每个任务启动集群不会产生更多成本?

4) 有什么东西我完全错过了吗?

基于@YannickSSE的评论响应的更新
我不使用数据块;你能用与你可能期望或不期望正在运行的集群相同的id启动一个新集群吗?在它正在运行的情况下,它是一个无操作的集群吗?也许不是,或者你可能不会问这个响应:不,启动新集群时不能提供id。

你能写一个python或bash操作符来测试集群的存在吗?(响应:这将是一个测试作业提交……不是最好的方法。)如果它找到并成功,下游任务将使用现有的集群id触发您的作业,但如果没有,另一个下游任务可以使用trigger_ruleall_failed执行相同的任务,但使用新的集群。那么这两个任务DatabricksSubmitRunOperator可以具有一个具有trigger_ruleone_success的下游任务。(响应:或者使用分支运算符来确定执行的运算符。)

这可能并不理想,因为我认为您的集群id会不时发生变化,导致您不得不跟上…集群是否是该操作员的databricks钩子连接的一部分,以及可以更新的内容?也许您想在需要它的任务中将其指定为{{ var.value.<identifying>_cluster_id }},并将其作为气流变量进行更新。(响应:集群id不在挂钩中,因此无论何时更改变量或DAG文件,都必须更新。)

Databricks最近似乎添加了一个选项,可以在作业中重用作业集群,在任务之间共享它。

https://databricks.com/blog/2022/02/04/saving-time-and-costs-with-cluster-reuse-in-databricks-jobs.html

到目前为止,每个任务都有自己的集群来适应不同类型的工作负载。虽然这种灵活性允许细粒度配置,它也会带来时间和成本并行期间群集启动或利用不足的开销任务。

为了保持这种灵活性,但要进一步提高利用率,我们很高兴地宣布集群重用。通过共享作业多任务集群客户可以减少一项工作的时间通过消除开销和增加集群来降低成本并行任务的利用率。

这似乎在新的API中也可用。https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate

job_clusters对象数组(JobCluster)<=100件

可以由共享和重用的作业集群规范的列表这项工作的任务。库不能在共享作业中声明簇必须在任务设置中声明依赖库。

为了适应您的用例,您可以用作业启动一个新的集群,在任务之间共享它,最后它会自动关闭。

我仍然不完全理解,如果我们想让作业在没有延迟的情况下启动,我们如何才能一直保持作业集群的热度。我也认为不可能在工作之间共享这些集群。

目前,这些信息应该会提供一个不错的线索。

事实上,当您想通过气流执行笔记本电脑时,您必须指定集群的特性。

databricks会将您的笔记本视为一个新作业,并将其放在您创建的集群上。但当执行完成时,创建的集群将自动删除。

要验证这一点:当作业在气流中运行时==>转到查看日志=>它为您提供了一个链接=>链接将您转发到databricks:在那里您单击View cluster,这样您将看到在一个新创建的集群上执行,例如job-1310-run-980

BLUF:您通常应该为计划的工作流创建新的集群,比如从Airflow编排的工作流。

DatabrickSubmitRunOperator使用jobs/runs/submit端点:

https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit

tasks > existing_cluster_id(强调矿)下:

如果existing_cluster_id,则用于此任务的所有运行。在现有集群上运行任务时如果集群停止响应,则可能需要手动重新启动集群我们建议在新集群上运行作业以提高可靠性

创建在气流操作员中配置的新集群时,它会创建一个作业集群,该集群在作业结束时终止。工作集群比通用集群便宜得多,通用集群无限期地存在。

https://www.databricks.com/product/aws-pricing

  1. 有没有一种方法可以让集群在关闭时也能被id调用否,如果通用集群终止,或者您提供了作业集群id,则提交作业运行将失败。

  2. 人们只是保持集群的活力吗为分析、协作和探索工作而创建的所有目的集群只能手动终止

  3. 还是我完全错了,为每个任务启动集群不会产生更多成本作业集群成本<每个DBU的通用集群成本的35%

最新更新