我正在研究一个数据流应用程序,我正在研究在这个项目中使用Apache Flink的可能性。这样做的主要原因是它支持很好的高级流结构,非常类似于Java 8的流API。
我将接收与数据库中特定记录对应的事件,我希望能够处理这些事件(来自RabbitMQ或Kafka等消息代理),并最终更新数据库中的记录,并将处理/转换的事件推送到另一个接收器(可能是另一个消息代理)。
与特定记录相关的事件理想情况下需要以FIFO顺序处理(尽管也会有一个时间戳来帮助检测无序事件),但与不同记录相关的事件可以并行处理。我计划使用keyBy()
构造按记录对流进行分区。
需要进行的处理取决于数据库中关于该记录的当前信息。但是,我无法找到一个示例或推荐的方法来查询数据库中的此类记录,以便用处理它所需的附加信息来丰富正在处理的事件。
我想到的管道如下:
-> keyBy()在接收到的id上->从数据库中检索id对应的记录->在记录上执行处理步骤->将处理的事件推送到外部队列并更新数据库记录
数据库记录将需要更新,因为另一个应用程序将查询数据。
在实现这个管道之后,可能还可以进行其他优化。例如,可以将(更新的)记录缓存为托管状态,以便同一记录上的下一个事件不需要另一个数据库查询。但是,如果应用程序不知道特定的记录,则需要从数据库中检索它。
在Apache Flink中使用这种场景的最佳方法是什么?
您可以通过扩展丰富的函数来执行数据库查找,例如RichFlatMap
函数,在其open()
方法中初始化数据库连接一次,然后在flatMap()
方法中处理每个事件:
public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {
// Declare DB coonection and query statements
@Override
public void open(Configuration parameters) throws Exception {
// Initialize Database connection
// Prepare Query statements
}
@Override
public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
// look up the Database, update record, enrich event
out.collect(enrichedEvent);
}
})
然后你可以使用DatabaseMapper
如下所示:
stream.keyby(id)
.flatmap(new DatabaseMapper())
.addSink(..);
你可以在这里找到一个使用Redis缓存数据的例子