如何在Apache气流中混合芹菜遗嘱执行人和kubernetes executor



我使用芹菜执行程序有多个DAG,但我希望使用Kubernetes executor运行一个特定的DAG。我无法推断出一种良好而可靠的方法来实现这一目标。

我有一个airflow.cfg,我在其中声明了要使用CeleryExecutor。而且我不想更改它,因为它在所有DAG中确实需要它。

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
executor = CeleryExecutor

我的DAG代码:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import 
    KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}
dag = DAG(
    'kubernetes_sample_1', default_args=default_args)

start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
                                image="Python:3.6",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="passing-test",
                                task_id="passing-task",
                                get_logs=True,
                                dag=dag
                                )
failing = KubernetesPodOperator(namespace='default',
                                image="ubuntu:1604",
                                cmds=["Python", "-c"],
                                arguments=["print('hello world')"],
                                labels={"foo": "bar"},
                                name="fail",
                                task_id="failing-task",
                                get_logs=True,
                                dag=dag
                                )
passing.set_upstream(start)
failing.set_upstream(start)

我可以放置一个IF-ELSE条件,然后从气流拾取配置的点更改值。如果这听起来不错,请告诉我路径和文件。尽管我希望得到一个更成熟的方法,但如果存在。

现在有celerykubernetesexecutor(看不到何时完全介绍它(,它需要设置芹菜和kubernetes,但也提供了两者的功能。

在官方文档中,他们提供了一个经验法则来决定何时值得使用:

我们建议您在使用时考虑芹菜素案例聚会:

在峰值上安排的任务数量超过扩展您的Kubernetes群集可以舒适地处理

您的任务的相对小部分需要运行时隔离。

您有很多可以在芹菜工人上执行的小任务但是您也有渴望资源的任务,最好运行预定义的环境。

启动气流2.x配置airflow.cfg如下:在[core]中,部分集executor = CeleryKubernetesExecutor[celery_kubernetes_executor]中的部分集kubernetes_queue = kubernetes。因此,每当您想在Kubernetes执行程序中运行任务实例时,请在任务定义中添加参数queue = kubernetes。例如

task1= BashOperator(
        task_id='Test_kubernetes_executor',
        bash_command='echo Kubernetes',
        queue = 'kubernetes'
    )
task2 = BashOperator(
        task_id='Test_Celery_Executor',
        bash_command='echo Celery',
    )

运行DAG时,您会看到在K8S中运行的Task1和芹菜中的Task2。因此,除非您将队列写为kubernetes,否则所有DAG都将在芹菜遗嘱执行人

上运行

我认为不能使用两个执行者。但是,您可以只使用CeleryExecutor,但是使用Kubernetespoderator声明资源密集型任务,而解决问题的作业则由CeleryExecutor安排/观看,并由Kubernetes运行,以在任务中进行实际处理逻辑。

相关内容

  • 没有找到相关文章

最新更新