希望这是一个简单的问题。
我是一个气流新手,我正试图用气流DAG作为POC取代我的一个etl,但我正在努力做一件基本的事情。
我希望将DAG运行的execution_date或ds宏注入到外部函数中的SQL字符串中,这样我就可以根据execution_date动态移动/聚合数据,这对于作业重新运行和回填很有用。
以下是DAG的基本结构:def create_db_engine():
[redacted]
return engine
def run_query(sql):
engine = create_db_engine()
connection = engine.connect()
data = connection.execute(sql)
return data
def wait_for_data():
sensor_query = f'''
select blah
from table
limit 1
'''
if run_query(sensor_query).rowcount >= 1:
return True
else:
return False
def run_aggregation():
agg_query = f'''
delete from table
where datefield = '{{ prev_ds }}'::DATE;
insert into table(datefield, metric)
select date_trunc('day', timefield), sum(metric)
from sourcetable
where timefield >= '{{ prev_ds }}'::DATE
and timefield < '{{ ds }}'::DATE
group by 1;
'''
run_query(agg_query)
@task
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
)
@task
def agg_operator(args):
PythonOperator(
task_id='agg_data',
python_callable=run_aggregation()
)
/总结指出:
- 基本前提是等待某些事情发生,然后运行一个能够利用执行日期的查询。
- 我正在尝试使用{{}}宏语法,但它似乎不能在操作符/传感器调用之外使用。
- 我正在使用SQL炼金术,因为我使用IAM角色链与AWS Redshift进行身份验证,我找不到一种方法使其与sqoperator/传感器一起工作。如果有人有解决方案,那将是一个很好的额外答案。
- Python 3.9,气流2.1.2。数据库为Amazon Redshift。
我已经尝试了一些不同的方法使事情工作:
kwargs #1 -根据这里的答案[https://stackoverflow.com/a/36754930/841233],添加provide_context=True应该使变量通过传递到函数中的kwargs作为**kwargs可用。但这对我不起作用。
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True
)
...
ds = kwargs['ds']
prev_ds = kwargs['prev_ds']
...
Error
ds = kwargs['ds']
KeyError: 'ds'
kwargs #2 -这里的答案[https://stackoverflow.com/a/50708735/841233]建议通过templates_dict变量向模板添加您想要的字段。但是这也不行
def data_sensor():
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(),
poke_interval=30,
timeout=3600,
provide_context=True,
templates_dict={'start_date': '{{ prev_ds }}',
'end_date': '{{ ds }}',
'next_date': '{{ next_ds }}'},
)
...
Error
end_date = kwargs.get(['templates_dict']).get(['ds'])
TypeError: unhashable type: 'list'
我的问题是:
- 这是怎么回事?这可能吗?
- 这是实现我所需要的正确范例吗?或者有不那么愚蠢的方法吗?
这实际上是很可能的,也不是一个坏主意,但是你必须通过JINJA模板引擎手动运行你的字符串(这是气流处理传递给操作符的模板参数时所做的)。
Airflow会自动处理添加到templated_fields
列表中的所有字段-但是由于您的操作符是Python代码,因此没有什么可以阻止您手动执行类似的处理。
你不应该像你试图做的那样使用PythonOperator。使用TaskFlow API, @task会自动用PythonOperator包装你的可调用Python方法,这样你的任务就是编写正确的Python代码——甚至不用考虑PythonOperator。
唯一的困难是你需要获得上下文,但这可以很容易地通过get_current_context()方法实现:在气流2.0中使用Taskflow API传递参数
一旦你有了上下文(这正是保存所有{{next_ds}}和其他上下文变量的地方),你就可以简单地用Jinja模板处理字符串,并将上下文传递给Jinja。你可以看到气流内部是如何做到的:https://github.com/apache/airflow/blob/932c3b5df5444970484f0ec23589be1820a3270d/airflow/models/baseoperator.py#L1070 -它有点复杂,因为它处理几个不同的情况,如xcom等,但你可以把它作为一个灵感。
除了Jarek的答案(它确实有效)之外,我还发现了一种更直接的方法来完成我需要的工作,而不需要Jinja模板。事实证明,您可以导入get_current_context函数,并使用它在函数之间传递作业上下文。
from airflow.operators.python import PythonOperator, get_current_context
...
@task
def data_sensor():
context = get_current_context()
PythonSensor(
task_id='check_data',
python_callable=wait_for_data(context),
poke_interval=30,
timeout=3600
)
....
def wait_for_data(context):
end_date = context['ds']
next_date = context['next_ds']