Apache Flink 异步请求和窗口



我无法选择合适的窗口函数/分配器。任务如下。首先,我从具有request_id和一些数据的源获取数据,并对外部数据库执行异步请求。

// Here String is for request_id, Data is for treated data
DataStream Tuple2<String, Data> stream = ...
// async I/O queries
DataStream<Tuple2<String, String>> resultStream =
AsyncDataStream.unorderedWait(
stream,
new AsyncDatabaseRequest(),
1000,
TimeUnit.MILLISECONDS,
100
);

现在我想通过request_id收集所有数据并进行一些计算。

DataStream Tuple2<String, Integer> = result
.map(val -> new Tuple2<String, Integer>(val.f0, val.f1.data_int))
.keyBy(0)
.window(...)
.sum(1);

问题是窗口功能。我需要每个窗口包含具有相同request_id的所有数据点,但异步查询的时间可能从毫秒到几分钟不等。另一方面,我需要低延迟,所以我不能使用ProcessingTimeSessionWindows.withGap(Time.minutes(10)).我需要在从异步函数获得最后一个数据后立即执行计算。

对我来说最好的是使用来自异步函数的窗口水印,它当然知道每个查询何时完成以及它承载的马赫点。这是否可能,此类任务的最佳实践是什么?

好吧,我找到了解决方案,它似乎非常简单。 我只使用事件时间。在我的源函数中,我生成事件时间戳和水印,如下所示:

Long ts = System.currentTimeMillis();
ctx.collectWithTimestamp(data, ts);
ctx.emitWatermark(new Watermark(ts + 1));

在流中,我使用 EventTime 函数:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<...> dataStream = ...;
DataStream<...> newStream = dataStream
.keyBy(0)
.timeWindow(Time.milliseconds(1))
.reduce(new Reducer());

通过这种方式,我避免了超时,结果立即准备就绪。

相关内容

  • 没有找到相关文章

最新更新