我正在为一个数据科学家团队建立一个多用户气流集群,DAG具有各种用途(ETL,NLP,ML,NN...(,其中一些具有特定的python依赖项。 我不能简单地在系统级别添加所有 DAG 依赖项。当然,我可以为常见用途设置基线,但对于特定需求,依赖压缩的 DAG 功能将非常有帮助。
因此,为了解决这个多上下文问题,我正在测试 Airflow 1.9.0(在 Ubuntu 16.04 上(的打包 DAG 功能。
我正在按照示例使用任意 pypi 包对其进行测试。
- 我随机选择了一个python模块(python-crontab(。(在此之前,我尝试使用更强大的模块,但重现测试需要更长的时间(
- 测试方案:能够导入该模块并在压缩的 DAG 中打印其版本
-
这是我这样做的方式:
$ virtualenv venv --python=python3 $ source venv/bin/activate (venv) $ mkdir contents && cd contents $ pip install --install-option="--install-lib=$PWD" python-crontab $ cp ../my_dag.py . $ zip -r ../test_zip_2.zip * $ cp ../test_zip_2.zip /path/to/dags $ journalctl -f -u airflow-scheduler.service (...) WARNING - No viable dags retrieved from /path/to/dags/test_zip_2.zip
-
我的DAG的内容:
import crontab import airflow.utils.dates as a_dates from airflow.operators.python_operator import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.models import DAG from pprint import pprint args = { 'owner': 'airflow', 'start_date': a_dates.days_ago(1) } def print_context(ds, **kwargs): pprint(kwargs) print(ds) print(crontab.__version__) return 'Whatever you return gets printed in the logs' with DAG(dag_id='test_zip', default_args=args, schedule_interval=None) as dag: ( PythonOperator( task_id='print_the_context', provide_context=True, python_callable=print_context, ) >> DummyOperator( task_id='do_nothing' ) )
检查代码后,如果找到不包含单词"DAG"和"airflow"的.py文件,则解析 ZIP 文件的逻辑似乎会立即退出。
问题是,我上面描述的方法实际上在存档的根目录中生成了其他.py文件。
$ ll
total 100
drwxr-xr-x 1 vagrant vagrant 442 Jun 1 14:48 ./
drwxr-xr-x 1 vagrant vagrant 306 Jun 1 15:30 ../
-rw-rw-r-- 1 vagrant vagrant 3904 Dec 30 2015 cronlog.py
-rw-rw-r-- 1 vagrant vagrant 44651 May 25 16:44 crontab.py
-rw-rw-r-- 1 vagrant vagrant 4438 Dec 28 2015 crontabs.py
drwxr-xr-x 1 vagrant vagrant 476 Jun 1 14:26 dateutil/
-rw-r--r-- 1 vagrant vagrant 6148 Jun 1 14:24 .DS_Store
drwxr-xr-x 1 vagrant vagrant 204 Jun 1 14:26 __pycache__/
drwxr-xr-x 1 vagrant vagrant 272 Jun 1 14:26 python_crontab-2.3.3-py3.5.egg-info/
drwxr-xr-x 1 vagrant vagrant 306 Jun 1 14:26 python_dateutil-2.7.3-py3.5.egg-info/
drwxr-xr-x 1 vagrant vagrant 238 Jun 1 14:26 six-1.11.0-py3.5.egg-info/
-rw-rw-r-- 1 vagrant vagrant 30888 Sep 17 2017 six.py
-rw-r--r-- 1 vagrant vagrant 832 Jun 1 14:48 my_dag.py
不过,我测试的许多知名软件包都会生成这些顶级.py文件。例如。安装Scrapy,Numpy,Pandas等产生了同样的混乱。
那么,我的选择是什么(没有分叉气流^_^(?
我是否正确理解此功能?
感谢您的帮助!
对于那些现在阅读本文的人,应该遵循下面的更新说明,从 v1.10.3 开始。
注意
搜索 DAG 时,默认情况下,Airflow 仅考虑包含字符串"airflow"和"DAG"的 python 文件。若要改为考虑所有 python 文件,请禁用
DAG_DISCOVERY_SAFE_MODE
配置标志。
https://github.com/apache/airflow/blob/1.10.3/docs/concepts.rst#dags
https://github.com/apache/airflow/blob/1.10.3/UPDATING.md#new-dag_discovery_safe_mode-config-option-1
编辑:该修复程序已合并到1.10版稳定版中,应该不再发生。
不幸的是,在代码的当前状态下,您想要的内容似乎是不可能的。
我已经在Apache Airflow的GitHub上提出了关于这个问题的拉取请求;如果你有兴趣跟随的话。