我正试图使用Flink来以流媒体的热情消费来自消息队列的有界数据。数据将采用以下格式:
{"id":-1,"name":"Start"}
{"id":1,"name":"Foo 1"}
{"id":2,"name":"Foo 2"}
{"id":3,"name":"Foo 3"}
{"id":4,"name":"Foo 4"}
{"id":5,"name":"Foo 5"}
...
{"id":-2,"name":"End"}
消息的开始和结束可以使用事件id来确定。我想接收这样的批,并将最新的批(通过覆盖(存储在磁盘或内存中。我可以编写一个自定义窗口触发器,使用开始和结束标志提取事件,如下所示:
DataStream<Foo> fooDataStream = ...
AllWindowedStream<Foo, GlobalWindow> fooWindow = fooDataStream.windowAll(GlobalWindows.create())
.trigger(new CustomTrigger<>())
.evictor(new Evictor<Foo, GlobalWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
for (Iterator<TimestampedValue<Foo>> iterator = elements.iterator();
iterator.hasNext(); ) {
TimestampedValue<Foo> foo = iterator.next();
if (foo.getValue().getId() < 0) {
iterator.remove();
}
}
}
@Override
public void evictAfter(Iterable<TimestampedValue<Foo>> elements, int size, GlobalWindow window, EvictorContext evictorContext) {
}
});
但是我怎样才能坚持最新窗口的输出。一种方法是使用ProcessAllWindowFunction
接收所有事件并手动将其写入磁盘,但这感觉像是黑客攻击。我也在研究带有Flink CEP模式的Table API(就像这个问题一样(,但找不到在每个批次之后清除Table以丢弃前一个批次中的事件的方法。
有几件事阻碍了您的需求:
(1( Flink的窗口操作符生成附加流,而不是更新流。它们不是为了更新以前发出的结果而设计的。CEP也不生成更新流。
(2( Flink的文件系统抽象不支持覆盖文件。这是因为像S3这样的对象存储不太支持这种操作。
我认为你的选择是:
(1( 返工作业,使其生成更新(变更日志(流。您可以使用toChangelogStream,也可以使用创建更新流的Table/SQL操作,如GROUP BY
(在没有时间窗口的情况下使用(。除此之外,您还需要选择一个支持收回/更新的接收器,例如数据库。
(2( 坚持生成一个附加流,并使用类似FileSink
的东西将结果写入一系列滚动文件。然后在Flink之外编写一些脚本,以获得您想要的内容。