如何将XCom值传递给PythonVirtualenvOperator作为参数?



气流2.2.4

我想创建PythonVirtualenvOperator,它将从另一个任务中拉出它的需求列表。

with DAG(="crawler") as dag:
@task(task_id="read_requirements")
def read_requirements():
requirements_file_path = "requirements.txt"
requirements = []
with open(requirements_file_path, "r") as f:
requirements = f.readlines()
return requirements
def crawler():
print("hello_world")
crawler = PythonVirtualenvOperator(
task_id="crawler",
python_callable=crawler,
requirements="{{ ti.pull('read_requirements') }}",
)
read_requirements() >> crawler

不幸的是,PythonVirtualenvOperator中的requirements字段不识别模板并将其作为普通字符串读取。那么,我怎样才能做到这一点呢?

我设法通过将PythonVirtualenvOperator包装在管理上下文的装饰函数中来解决这个问题。

with DAG(="crawler") as dag:
@task(task_id="read_requirements")
def read_requirements(scraper_path):
requirements_file_path = f"{scraper_path}/requirements.txt"
requirements = []
with open(requirements_file_path, "r") as f:
requirements = f.readlines()
return requirements
scraper_dir = Variable.get("workspace_dir")
requirements = read_requirements(scraper_dir)

def crawler():
print("hello_world")
@task(task_id="crawler_task")
def crawler_task():
context = airflow.operators.python.get_current_context()
ti = context["ti"]
requirements = ti.xcom_pull(task_ids="read_requirements")
venv_crawler = PythonVirtualenvOperator(
task_id="venv_crawler",
requirements=requirements,
python_callable=crawler,
dag=dag,
)
venv_crawler.execute(context=context)
requirements >> crawler_task()

感觉不自然,幸好你可以在较新的气流版本中传递。txt文件,但它确实是这样工作的。

相关内容

  • 没有找到相关文章

最新更新