从Apache气流中启动一个Docker容器,而不创建一个



我使用docker-compose启动了一个正在运行的Apache气流。我也有一个docker-compose项目,我想与气流管理。

example_workers/docker-compose.yml:

version: '3'
services:
my_app:
build:
context: ./my_app/
environment:
TEST: 1
INDOCKER: 1
restart: "no"
volumes:
- ./my_app:/my_app
depends_on:
- redis
redis:
build:
context: ./redis/.
restart: always
volumes:
- ./redis/data:/data

更具体地说:我想在我的项目中运行docker-compose build && docker-compose up -d。Redis启动并继续运行,my_app运行一次然后停止。之后,我想要开始有气流的my_app容器

但是DockerOperator似乎总是创建

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator
dag = DAG(
'my_app',
default_args={'retries': 0},
schedule_interval=timedelta(minutes=10),
start_date=datetime(2021, 1, 1),
catchup=False,
)
t0 = DockerOperator(
docker_url='unix:///var/run/docker.sock',
image='example_workers_my_app',
container_name='example_workers_my_app_1',
task_id='docker_op_tester',
dag=dag,
mount_tmp_dir=False
)

日志:

[2022-05-19, 18:15:17 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 268, in _raise_for_status
response.raise_for_status()
File "/home/airflow/.local/lib/python3.7/site-packages/requests/models.py", line 960, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 409 Client Error: Conflict for url: http+docker://localhost/v1.39/containers/create?name=example_workers_my_app_1
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 387, in execute
return self._run_image()
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 266, in _run_image
return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 298, in _run_image_with_mounts
tty=self.tty,
File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 428, in create_container
return self.create_container_from_config(config, name)
File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 439, in create_container_from_config
return self._result(res, True)
File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 274, in _result
self._raise_for_status(response)
File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 270, in _raise_for_status
raise create_api_error_from_http_exception(e)
File "/home/airflow/.local/lib/python3.7/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation)
docker.errors.APIError: 409 Client Error for http+docker://localhost/v1.39/containers/create?name=example_workers_my_app_1: Conflict ("Conflict. The container name "/example_workers_my_app_1" is already in use by container "7e40949857be7a3e2ea3b6001562d0cde285c4ca2a0aa1add4851021ba9a9ed6". You have to remove (or rename) that container to be able to reuse that name.")

如果我手动删除容器并在DockerOperator中设置auto_remove=True,那么它可以正常工作,但我有两个问题:我必须现在在操作符中配置容器(例如设置正确的网络,环境变量等),这意味着复制docker-compose。yml设置。其次,不断地创建和删除相同的容器,而不是使用现有的容器,必须有一点开销。

我应该使用另一种操作符类型,如BashOperator来实现所需的结果吗?我是否能够获得尽可能多的有用(例如docker特定的日志或其他东西)信息关于我的工作使用BashOperator,如果我使用DockerOperator?

查看代码,似乎DockerOperator旨在创建新容器,而不是启动现有容器。所以我不认为你会有很大的成功这样做,除非你启动一个新的容器与特权访问docker套接字,然后执行命令启动现有的docker容器。这感觉有点迂回,尽管它在技术上是一个解决方案。

我建议使用BashOperator(如您所提到的)或使用像docker - py这样的包从python DAG中执行docker启动命令。

事实证明,我在docker中运行气流本身时解决问题的最简单方法(这使得很难使用BashOperator,因为容器内没有docker可执行文件),并且有一个DockerOperator启动和运行只是修改DockerOperator本身,以便当它找到一个具有相同名称的容器时,它不会引发异常,但只是启动找到的容器。

。这是一个肮脏的解决方案。DockerOperator在实现它之后,当有一个现有的容器时,它开始默默地忽略它的大部分参数。此外,修改源代码会使将来更新应用程序变得更加困难。

在docker- composition -based AirFlow中,DockerOperator源代码位于/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py

我已经包装了这一部分(感谢@kthompso指向文件中的确切位置):

self.container = self.cli.create_container(
...
)

Withtry .. except:

try:
self.container = self.cli.create_container(
...
)
except APIError as e:
if (e.status_code != 409) or ('container name' not in e.explanation):
raise e
self.container = self.cli.containers(
filters={'name': self.container_name}, 
all=True, quiet=True, latest=True
)[0]

如果docker客户端由于409 Conflict导致创建指定名称的容器失败,并且错误解释中提到"容器名称";那么容器可能已经存在,并且可以使用相同的docker客户端获取。幸运的是,其中有一个合适的方法。部分文档字符串解释使用的参数:

"""
List containers. Similar to the ``docker ps`` command.
Args:
quiet (bool): Only display numeric Ids
all (bool): Show all containers. Only running containers are shown
by default
...
latest (bool): Show only the latest created container, include
non-running ones.
...
filters (dict): Filters to be processed on the image list.
Available filters:
...
- `name` (str): The name of the container.
...
Returns:
A list of dicts, one per container
"""

最新更新