消息到达AWS SQS队列后触发气流DAG



是否可以在消息到达SQS队列时安排DAG运行?我还需要日期来处理队列中的消息。据我所知,这可以通过使用SQSSensor来完成,但我找不到任何例子,我对如何前进感到困惑。

气流以固定的间隔运行dag,而您现在希望每个事件触发dag。你必须在气流之外做到这一点,例如使用Lambda触发器监听队列,它通过REST API触发气流DAG。

气流中的SQSSensor不允许逐个事件处理,因为它只是在DAG运行开始后轮询队列(检查新消息,将它们推送到带有关键消息的XCom,如果找到则删除消息)。因此,如果您的DAG运行计划为每天一次,那么SQSSensor将每天只开始轮询一次新消息。

我在气流中找不到一个SQSOperator来读取SQS消息,所以要创建一个事件触发的SQS +气流工作流,我最好的猜测是通过REST API设置一个Lambda来触发气流DAG, DAG本身将从读取队列上所有消息的SQSSensor开始,然后其他任务读取和处理由SQSSensor任务创建的XCom的值。DAG的schedule_interval可以设置为None,因为它将通过REST API触发。

最新更新