如何在虚拟环境中运行Airflow PythonOperator



我有几个 python 文件,我目前正在使用 BashOperator 执行。这使我能够灵活地轻松选择python虚拟环境。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
...}
dag = DAG('python_tasks', default_args=default_args, schedule_interval="23 4 * * *")
t1 = BashOperator(
task_id='task1',
bash_command='~/anaconda3/envs/myenv/bin/python 
/python_files/python_task1.py',
dag=dag)

我怎样才能使用PythonOperator来实现同样的事情?

from airflow.operators.bash_operator import PythonOperator
import python_files.python_task1
python_task = PythonOperator(
task_id='python_task',
python_callable=python_task1.main,
dag=dag)

我假设PythonOperator将使用系统python环境。 我发现Airflow具有PythonVirtualenvOperator,但这似乎是通过使用指定的要求动态创建新的虚拟环境来工作的。我更喜欢使用已经正确配置的现有版本。 如何使用指定的 python 路径运行 PythonOperator?

我的解决方法是使用 Bash 运算符调用/path/to/project/venv/bin/python my.py

首先:您不应该(通常)依赖操作员预先存在的资源。您的运算符应该是可移植的,因此使用长期存在的 virtualenvs 有点违背了这一原则。话虽如此,这没什么大不了的,就像您必须将软件包预安装到全局环境中一样,您可以预烘焙一些环境。或者,您可以让操作员创建环境,后续操作员可以重用它 - 我相信这是最简单和最危险的方法。

实现"虚拟环境缓存"应该不难。读取PythonVirtualenvOperator的执行方法的实现:

def execute_callable(self):
with TemporaryDirectory(prefix='venv') as tmp_dir:
...
self._execute_in_subprocess(
self._generate_python_cmd(tmp_dir,
script_filename,
input_filename,
output_filename,
string_args_filename))
return self._read_result(output_filename)

所以看起来它没有明确删除 virtualenv(它依赖于TemporaryDirectory来做到这一点)。您可以子类PythonVirtualenvOperator,只需使用自己的上下文管理器来重用临时目录:

import glob
@contextmanager
def ReusableTemporaryDirectory(prefix):
try:
existing = glob.glob('/tmp/' + prefix + '*')
if len(existing):
name = existing[0]
else:
name = mkdtemp(prefix=prefix)
yield name
finally:
# simply don't delete the tmp dir
pass
def execute_callable(self):
with ReusableTemporaryDirectory(prefix='cached-venv') as tmp_dir:
...

当然,您可以摆脱ReusableTemporaryDirectory中的try-finally并放回通常的suffixdir参数,我做了最小的更改,以便于与原始TemporaryDirectory类进行比较。

这样,您的 virtualenv 不会被丢弃,但操作员最终会安装较新的依赖项。

使用 PythonVirtualenvOperator。您需要提供要运行的 python 函数和虚拟环境的要求.txt。

最新更新