在气流的python_callable内执行SSH连接?



在Airflow 1.10.10 DAG中,我们有一个ShortCircuitOperator,它使用python函数check_remote_server()来决定分支。

check_remote_server_data()函数中,我们如何启动与远程服务器的SSH连接,在其上运行bash命令并获得结果?

是否可以使用我之前使用Web UI定义的Airflow SSH连接?

def check_remote_server_data():
pass   # how can we use a predefined Airflow SSH connection, named `remote`?
with dag:
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
dag=dag
)

我只能使用SSHOperator进行,但我需要使用结果来确定短路条件:

SSHOperator(
task_id='sshop',
ssh_conn_id='remote',
command='date +%F',
dag=dag)

SSHOperator支持参数do_xcom_push,以XCom值的形式传递命令的输出,您可以在ShortCircuitOperator:中访问该值

def check_remote_server_data(**context):
xcom_stdout = context["task_instance"].xcom_pull(task_ids="ssh_task_id")
with dag:
ssh_operator = SSHOperator(task_id="ssh_task_id", do_xcom_push=True, ...)
shortcircuitop = ShortCircuitOperator(
task_id='shortcircuitop',
python_callable=check_remote_server_data,
provide_context=True,
dag=dag
)
ssh_operator >> shortcircuitop >> [other_task0, other_task1, ...]

最新更新