Apache Airflow中的自定义操作符状态(排队,成功等)?



在Apache的气流(2.x),每个操作员实例有一个状态定义在这里(气流源的repo)。

我有两个用例似乎没有明确地落入预定义的状态:

  1. 警告,但不要失败-这似乎应该是一个非常标准的用例,我很惊讶没有在开箱即用的气流源代码中看到它。基本上,我想用一些引人注目的颜色来标记一个节点——比如橙色——与非致命警告相对应,但在其他情况下继续正常执行。显然,您可以将警告打印到日志中,但是找到它们比仅仅查看DAGs页面上的彩色圆圈需要更多的工作。

  2. "传感器N/A"或"数据未准备好";当传感器注意到源系统中的数据尚未准备好时,可以跳过下游操作,直到下一个DAG执行,但数据管道中没有任何损坏。基本上是预期的分支结束。

是否有一种好的方法可以用开箱即用的气流节点状态来实现这些用例中的任何一个?如果没有,是否有一种方法来定义自定义操作符状态?由于我在托管服务(MWAA)上运行气流,我不认为更改部署的源代码是一个选项。

谢谢,

任务状态与气流紧密结合。没有办法配置哪个日志级别会导致哪个状态。我想说,最简单的方法是在日志文件中搜索"WARNING"或者设置一个日志聚合服务,例如Elasticsearch,使日志文件可搜索。

对于#2,传感器不知道为什么传感器超时。在到达timeoutexecution_timeout之后,它们只是引发一个异常。您可以使用trigger_rules处理异常,但是这些规则仍然不会考虑异常的原因。

如果你想对此有更多的控制,我会实现你自己的传感器,它需要一个参数,例如data_not_ready_timeout(比timeoutexecution_timeout小)。在poke()方法中,检查是否达到data_not_ready_timeout,如果达到,则引发AirflowSkipException。这将跳过下游任务。一旦到达timeoutexecution_timeout,任务失败。查看BaseSensorOperator.execute()获取传感器初始起始日期的一些灵感。

最新更新