我用CLI创建了一个自定义包(使用Click构建)。这个包可以做两件事:运行预处理和运行机器学习模型。我为这个客户包创建了一个Docker镜像,并将其推送到AWS (ECR)上的私有注册表。
现在我想用气流运行这个容器,我想在EC2实例上运行。我用docker-compose来运行它。
对于这个例子,我将只关注一个任务:运行容器进行预处理。
然而,现在我得到'上游失败'的t2。
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'example_pipeline',
default_args=default_args,
description='example data pipeline.',
schedule_interval=timedelta(minutes=3)
)
t1 = BashOperator(
task_id='starting_airflow',
bash_command='echo "Starting Airflow DAG..."',
dag=dag,
)
t2 = DockerOperator(
task_id='data_pipeline',
image='XXXXXXXX.dkr.ecr.eu-central-1.amazonaws.com/rwg:latest',
container_name='task__export_data',
command="run-preprocessing",
network_mode="bridge",
api_version="auto",
docker_url="unix://var/run/docker.sock",
docker_conn_id='aws_con',
dag=dag
)
t1 >> t2
我通过UI创建了'aws_con'。
此外,这是我的docker-compose。yml文件。
version: '3'
services:
postgres:
image: postgres:9.6
environment:
- POSTGRES_USER=airflow
- POSTGRES_PASSWORD=airflow
- POSTGRES_DB=airflow
ports:
- "5432:5432"
volumes:
- ./pgdata:/var/lib/postgresql/data
webserver:
build: .
restart: always
depends_on:
- postgres
environment:
- LOAD_EX=n
- EXECUTOR=Local
- FERNET_KEY=jsDPRErfv8Z_eVTnGfF8ywd19j4pyqE3NpdUBA_oRTo=
volumes:
- ./dags:/usr/local/airflow/dags
ports:
- "8080:8080"
command: webserver
healthcheck:
test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
interval: 30s
timeout: 30s
retries: 3
关于Docker操作符,我做错了什么?
第二个问题:我如何创建这个"aws_con"通过代码还是cli?
您应该指定docker连接。
这是在气流中传递凭证的标准方式,Docker有一个专用的Docker连接,你可以在气流数据库或秘密中定义,并将连接的id作为docker_conn_id
参数传递给DockerOperator(你也可以在那里指定url,这样你就不需要在你的操作符中传递docker_url)。
查看Python API:
https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/_api/airflow/providers/docker/operators/docker/index.html
关于Docker连接的单独页面在这里(docker_conn_id
描述链接到该页):
https://airflow.apache.org/docs/apache-airflow-providers-docker/stable/connections/docker.html howto-connection-docker