指定.py文件通过带有apache气流的BashOperator()运行所需的依赖项



我正在尝试使用apache airflow和docker来自动化每天运行webscraper的过程。我已经启动并运行了气流服务器,我可以通过本地服务器上的气流GUI手动初始化我的dag,但它失败了。

我甚至不知道在哪里可以看到触发了什么错误。我的dag.py文件在下面。。。您可以看到我试图在哪里使用BashOperator函数来运行脚本。我怀疑问题出在scraper使用的依赖关系上,但我不确定如何集成配置文件和通过apache/docker运行脚本所需的其他包。

from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
dag =  DAG("MI_Spider", start_date=datetime(2021,1,1), schedule_interval="@daily", catchup=False) 
curl = BashOperator(
task_id='testingbash',
bash_command="python ~/spider/path/MichiganSpider.py",
dag=dag)

我应该将spider文件和配置文件移到airflow项目目录中,还是以某种方式将依赖项直接安装到我正在使用的docker容器中,同时在docker容器内设置env变量,而不是通过单独的配置文件调用db登录凭据?当我手动运行它时,我一直在为scraper使用conda-env。有什么办法可以让我使用那种环境吗?

我是docker和apache气流的新手,所以如果这件事很明显,我很抱歉。

提前感谢!!

假设您使用的是Airflow的最新版本,我建议您重构DAG以使用PythonVirtualenvOperatorhttps://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#pythonvirtualenvoperator而不是BashOperator。

以下是如何在Airflow中使用python操作符的示例:https://airflow.apache.org/docs/apache-airflow/stable/_modules/airflow/example_dags/example_python_operator.html

与您相关的部分是:

import logging
import shutil
import time
from pprint import pprint
import pendulum
from airflow import DAG
from airflow.decorators import task
log = logging.getLogger(__name__)

with DAG(
dag_id='example_python_operator',
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=['example'],
) as dag:
if not shutil.which("virtualenv"):
log.warning("The virtalenv_python example task requires virtualenv, please install it.")
else:
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + 'some red text')
print(Back.GREEN + 'and with a green background')
print(Style.DIM + 'and in dim text')
print(Style.RESET_ALL)
for _ in range(10):
print(Style.DIM + 'Please wait...', flush=True)
sleep(10)
print('Finished')
virtualenv_task = callable_virtualenv()

只需记住在您的Airflow图像中提供virtualenv包即可。

最新更新