如何从Apache Flink的数据库中查找和更新记录的状态



我正在研究一个数据流应用程序,我正在研究在这个项目中使用Apache Flink的可能性。这样做的主要原因是它支持很好的高级流结构,非常类似于Java 8的流API。

我将接收与数据库中特定记录对应的事件,我希望能够处理这些事件(来自RabbitMQ或Kafka等消息代理),并最终更新数据库中的记录,并将处理/转换的事件推送到另一个接收器(可能是另一个消息代理)。

与特定记录相关的事件理想情况下需要以FIFO顺序处理(尽管也会有一个时间戳来帮助检测无序事件),但与不同记录相关的事件可以并行处理。我计划使用keyBy()构造按记录对流进行分区。

需要进行的处理取决于数据库中关于该记录的当前信息。但是,我无法找到一个示例或推荐的方法来查询数据库中的此类记录,以便用处理它所需的附加信息来丰富正在处理的事件。

我想到的管道如下:

-> keyBy()在接收到的id上->从数据库中检索id对应的记录->在记录上执行处理步骤->将处理的事件推送到外部队列并更新数据库记录

数据库记录将需要更新,因为另一个应用程序将查询数据。

在实现这个管道之后,可能还可以进行其他优化。例如,可以将(更新的)记录缓存为托管状态,以便同一记录上的下一个事件不需要另一个数据库查询。但是,如果应用程序不知道特定的记录,则需要从数据库中检索它。

在Apache Flink中使用这种场景的最佳方法是什么?

您可以通过扩展丰富的函数来执行数据库查找,例如RichFlatMap函数,在其open()方法中初始化数据库连接一次,然后在flatMap()方法中处理每个事件:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {
    // Declare DB coonection and query statements
    @Override
    public void open(Configuration parameters) throws Exception {
      // Initialize Database connection
      // Prepare Query statements
    }
    @Override
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
      // look up the Database, update record, enrich event
      out.collect(enrichedEvent);        
    }
})

然后你可以使用DatabaseMapper如下所示:

stream.keyby(id)
      .flatmap(new DatabaseMapper())
      .addSink(..);

你可以在这里找到一个使用Redis缓存数据的例子

相关内容

  • 没有找到相关文章

最新更新