我有两个流。它们都是 1 小时窗口内的聚合数据。我想压缩这些流,以便将同一时间跨度的聚合一起更新,如果现在存在这种对应匹配,则可能使用空值。
DataStream<OneHourAggA> one =
sourceA
.keyBy(d -> (String) d.values.get("m"))
.timeWindow(Time.hours(1))
.apply(new WorkWindwFolder());
DataSteam<OneHourAggB> other =
sourceB
.keyBy(d -> (String) d.values.get("m"))
.timeWindow(Time.hours(1))
.apply(new WorkWindwFolder());
DataStream<Tuple2<Option<OneHourAggA>,Option<OneHourAggB>> zipped =
sourceA.???(sourceB)
我怎样才能做到这一点?
您必须使用 coGroup
操作来执行聚合结果的外部联接。您将对 coGroup 操作使用相同的时间窗口规范。这是有效的,因为前一个窗口的聚合结果只会为每个窗口生成一个元素,并且此元素将获得分配的此窗口的最大时间戳。