背景信息:使用Kubernetes Executor运行Airflow v2.2.2 .
我正在运行一个进程,它创建了一系列快速任务。
这些任务足够短,以至于Kubernetes Pod初始化占用了其中许多任务的大部分运行时间。
是否有一种方法可以重新利用预先初始化的pod用于多个任务?
我在一个旧问题中发现了一个评论,该评论指出,当运行Subdag操作符时,所有子任务将在同一pod上运行,但我找不到任何进一步的信息。这是真的吗?
我搜索了以下资源:
- 气流文档:Kubernetes
- 气流文档:KubernetesExecutor
- 气流文档:KubernetesPodOperator
- StackOverflow线程:在同一个pod上运行两个作业,在Kube上运行最佳气流
- Google Search:
airflow kubernetes reuse initialization
但是还没有真正找到任何直接解决我的问题的东西。
我认为这在airflow中是不可能的,即使有Subdag操作符,它运行一个单独的dag作为当前dag的一部分,使用与其他任务相同的方式。
为了解决你的问题,你可以使用CeleryKubernetesExecutor,它结合了CeleryExecutor和KubernetesExecutor。默认情况下,任务将在芹菜队列中排队,但对于繁重的任务,您可以选择K8S队列,以便在独立的pod中运行它们。在这种情况下,您将能够使用一直处于启动状态的Celery worker。
kubernetes_task= BashOperator(
task_id='kubernetes_executor_task',
bash_command='echo "Hello from Kubernetes"',
queue = 'kubernetes'
)
celery_task = BashOperator(
task_id='celery_executor_task',
bash_command='echo "Hello from Celery"',
)
如果你担心芹菜工作线程的可伸缩性/成本,你可以使用KEDA将芹菜工作线程从0个工作线程扩展到基于队列任务数的最大工作线程数。