我实现了气流,我们可以添加数据健全性逻辑吗。假设我有任务1,它执行以下任务
1.Read the data from the data source--RAW DATA.
2. do join with dimensional table to get the some relation detail product name etc.
3. Store output file some location after step 2.
有一个task2将输出文件存储到数据库中。但在执行task2之前,我需要进行一些数据验证,比如RAW data的计数应该等于存储输出文件的计数,即在加入之后类似于count(raw_data(=count(raw_data_join_with_dimension(,如果为true,则触发Task2,否则发送警报并使作业失败。
对于该用例,一个可能的工作流可以是:
check_op = SQLCheckOperator(
task_id='check_task',
sql='YOUR VALIDATION SQL',
conn_id='YOUR CONN',
)
t2_op = YourNextOperator()
failure_op = EmailOperator(subject='check has failed', to='YOUR EMAIL', trigger_rule='one_failed')
check_op >> [t2_op, failure_op]
它的工作原理如下:
SQLCheckOperator
对数据库运行查询。如果查询返回False
,则检查失败,因此操作员将处于Failure
状态。如果查询返回值,则查询视为成功,因此操作员将处于Success
状态- 如果
SQLCheckOperator
状态为失败,则会触发EmailOperator,否则会触发YourNextOperator
编辑:去看看@Elad的答案,这个任务有一个更具体的操作符。
airflow.sensors.sql_sensor.SqlSensor
可用于构建一个可以检查数据质量的任务:
from airflow.sensors import sql_sensor
...
check_data_task = sql_sensor.SqlSensor(
task_id="check_data",
conn_id="YourConnectionIdentifier",
sql="SELECT CASE WHEN data_is_valid THEN 1 ELSE 0 END ...",
timeout=0
)
关键是您的CCD_ 8参数返回"0";至少一个包含非零/空字符串值"——根据文件。timeout=0
意味着它将检查一次,如果您的数据检查查询没有"检查";通过";