我们有几个应用程序跨越多个AWS区域。因此,与其使用多个部署的气流来处理ETL任务(每个区域一个),我们更想弄清楚是否有一种方法可以在不同的区域/集群/命名空间中使用worker。
我们的气流部署在EKS中运行,所以我猜这可能是KubernetesPodOperator中的设置。我也没有看到通过DAG指定集群的方法,但我希望这里的一些天才可能有一个想法。
提前感谢,比尔
在我工作的公司,我们使用KubernetesPodOperator在不同的命名空间中运行。
KubernetesPodOperator有一个名为'namespace'的参数。通过此参数,可以确定作业将在哪个命名空间中运行。
write_xcom = KubernetesPodOperator(
namespace='default',
image='alpine',
cmds=["sh", "-c", "mkdir -p /airflow/xcom/;echo '[1,2,3,4]' > /airflow/xcom/return.json"],
name="write-xcom",
do_xcom_push=True,
is_delete_operator_pod=True,
in_cluster=True,
task_id="write-xcom",
get_logs=True,
)
当我在不同的集群中搜索工作时,我看到KubernetesPodOperator在最后一个稳定版本中有一个名为"config_file"的参数。这个值被设置为"~/"。默认为Kube/config'。
链接:https://airflow.apache.org/docs/apache-airflow-providers-cncf-kubernetes/stable/operators.html
我以前没有尝试过,但是通过创建不同的配置文件,可以使用'config_file'参数在不同的集群中工作。我将遵循不同的更好的解决方案。
我遇到了一个示例解决方案如下
from datetime import datetime, timedelta
from airflow import DAG
from airflow.configuration import conf
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
namespace = conf.get('kubernetes', 'NAMESPACE')
# This will detect the default namespace locally and read the
# environment namespace when deployed to Astronomer.
if namespace =='default':
config_file = '/usr/local/airflow/include/.kube/config'
in_cluster = False
else:
in_cluster = True
config_file = None
dag = DAG('example_kubernetes_pod', schedule_interval='@once', default_args=default_args)
with mountain:
KubernetesPodOperator(
namespace=namespace,
image="hello-world",
labels={"<pod-label>": "<label-name>"},
name="airflow-test-pod",
task_id="task-one",
in_cluster=in_cluster, # if set to true, will look in the cluster, if false, looks for file
cluster_context="docker-desktop", # is ignored when in_cluster is set to True
config_file=config_file,
is_delete_operator_pod=True,
get_logs=True,
)
详细信息可参考此链接:https://docs.astronomer.io/software/kubepodoperator-local