在airflow上使用docker操作符装载目录不起作用



我正在尝试使用docker操作符来使用气流自动执行一些脚本。

气流版本:apache-airflow==1.10.12

我想做的是";复制";使用此代码将我项目的所有文件(包括文件夹和文件(添加到容器中。

以下文件ml-intermediate.py在此目录~/airflow/dags/ml-intermediate.py:中

"""
Template to convert a Ploomber DAG to Airflow
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from ploomber.spec import DAGSpec
from soopervisor.script.ScriptConfig import ScriptConfig
script_cfg = ScriptConfig.from_path('/home/letyndr/airflow/dags/ml-intermediate')
# Replace the project root to reflect the new location - or maybe just
# write a soopervisor.yaml, then we can we rid of this line
script_cfg.paths.project = '/home/letyndr/airflow/dags/ml-intermediate'
# TODO: use lazy_import from script_cfg
dag_ploomber = DAGSpec('/home/letyndr/airflow/dags/ml-intermediate/pipeline.yaml',
lazy_import=True).to_dag()
dag_ploomber.name = "ML Intermediate"
default_args = {
'start_date': days_ago(0),
}
dag_airflow = DAG(
dag_ploomber.name.replace(' ', '-'),
default_args=default_args,
description='Ploomber dag',
schedule_interval=None,
)
script_cfg.save_script()
from airflow.operators.docker_operator import DockerOperator
for task_name in dag_ploomber:
DockerOperator(task_id=task_name,
image="continuumio/miniconda3",
api_version="auto",
auto_remove=True,
# command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
command="sleep 600",
docker_url="unix://var/run/docker.sock",
volumes=[
"/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
],
working_dir=script_cfg.paths.project,
dag=dag_airflow,
container_name=task_name,
)

for task_name in dag_ploomber:
task_ploomber = dag_ploomber[task_name]
task_airflow = dag_airflow.get_task(task_name)
for upstream in task_ploomber.upstream:
task_airflow.set_upstream(dag_airflow.get_task(upstream))
dag = dag_airflow

当我使用Airflow执行这个DAG时,我会得到一个错误,即docker找不到/home/letyndr/airflow/dags/ml-intermediate/script.sh脚本。我更改了docker操作符sleep 600的执行命令,以进入容器并使用正确的路径检查容器中的文件。

例如,当我在容器中时,我可以转到这个路径/home/letyndr/airflow/dags/ml-intermediate/,但我看不到应该在那里的文件。

我试图复制Airflow如何为Python实现Docker SDK检查包Docker操作符Airflow的这一部分,特别是它创建Docker容器的地方:Docker容器创建

这是我对docker实现的一个复制:

import docker
client = docker.APIClient()
# binds = {
#         "/home/letyndr/airflow/dags": {
#             "bind": "/home/letyndr/airflow/dags",
#             "mode": "rw"
#         },
#         "/home/letyndr/airflow-data/ml-intermediate": {
#             "bind": "/home/letyndr/airflow-data/ml-intermediate",
#             "mode": "rw"
#         }
#     }
binds = ["/home/letyndr/airflow/dags:/home/letyndr/airflow/dags:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"]
container = client.create_container(
image="continuumio/miniconda3",
command="sleep 600",
volumes=["/home/letyndr/airflow/dags", "/home/letyndr/airflow-data/ml-intermediate"],
host_config=client.create_host_config(binds=binds),
working_dir="/home/letyndr/airflow/dags",
name="simple_example",
)
client.start(container=container.get("Id"))

我发现,只有在设置了host_configvolumes的情况下,装载卷才能工作,问题是Airflow上的实现只设置了host_config,而没有设置volumes。我在方法create_container上添加了参数,它起作用了。

你知道我是否正确地使用了docker运算符,还是这是一个问题?

尝试使用mounts参数而不是volumes。这就是Airflow文档/源代码中对体积的定义。

所以它应该看起来像这样:

DockerOperator(task_id=task_name,
image="continuumio/miniconda3",
api_version="auto",
auto_remove=True,
# command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
command="sleep 600",
docker_url="unix://var/run/docker.sock",
mounts=[
"/home/letyndr/airflow/dags/ml-intermediate:/home/letyndr/airflow/dags/ml-intermediate:rw",
"/home/letyndr/airflow-data/ml-intermediate:/home/letyndr/airflow-data/ml-intermediate:rw"
],
working_dir=script_cfg.paths.project,
dag=dag_airflow,
container_name=task_name,
)

以下是其他一些可能有用的可选参数:

  1. host_tmp_dir:指定将映射到tmp_dir的主机上的临时目录的位置。如果未提供,则默认使用标准系统临时目录。

  2. tmp_dir:容器内的装载点到操作员在主机上创建的临时目录tmp_dir:

编辑:经过进一步审查,我发现每个mount项目必须是docker.types中的Mount类型。参数volumes也被重命名为mount,作为Airflow 2.1的Changelog的一部分。以下是Airflow源代码中的一个示例。

修改后,代码看起来应该类似于

from docker.types import Mount 
...
...
DockerOperator(task_id=task_name,
image="continuumio/miniconda3",
api_version="auto",
auto_remove=True,
# command="sh /home/letyndr/airflow/dags/ml-intermediate/script.sh",
command="sleep 600",
docker_url="unix://var/run/docker.sock",
mounts=[
Mount(
source='/home/letyndr/airflow/dags/ml-intermediate', 
target='/home/letyndr/airflow/dags/ml-intermediate:rw', 
type='bind'
)
],
working_dir=script_cfg.paths.project,
dag=dag_airflow,
container_name=task_name,
)

最新更新