每当我收到一条消息时,我都想从数据库中读取,可能返回数百万行,然后我想顺流而下。这在 Flink 中被认为是好的做法吗?
public static class StatsReader implements FlatMapFunction<Msg, Json> {
Transactor txor =
...;
@Override
public void flatMap(Msg msg, Collector<Json> out) {
//Possibly lazy and async stream
java.util.Stream<Json> results =
txor.exec(Stats.read(msg));
results.foreach(stat->out.collect(stat));
}
}
编辑:
背景:我想动态运行报告。数据库基本上充当一个巨大的窗口。该报告基于该窗口 + 实时数据。该报告是高度可定制的,因此很难预处理结果或先验定义管道。
我今天使用vanilla java,管道大致是这样的:ReportDefinition -> ( 弹性搜索查询 + 实时流 ( -> ( ReportProcessingPipeline ( -> ( Websocket push (
应该是可能的。但是,我建议使用AsyncFunction
而不是FlatMapFunction
。
请注意,此类设置可能需要调整检查点参数,例如检查点间隔。