Guava缓存为Flink中的ValueState



我正在尝试消除Flink管道中的重复事件。我正在尝试使用番石榴缓存来做到这一点。我的要求是,我想在1分钟内消除重复。但在任何给定的点上,我都希望在缓存中维护不超过10000个元素。

我的Flink窗口实验的小背景:

  • 翻滚窗口:我能够使用翻滚窗口+自定义触发器来实现这一点。但问题是,如果一个元素出现在第59分钟和第61分钟,它就不会被识别为重复
  • 滑动窗口:我还尝试了10秒重叠+自定义触发器的滑动窗口。但第55秒出现的元素是5个不同窗口的一部分,它被写入水槽5次

如果我不应该在窗口中看到上述行为,请告诉我

返回番石榴:

我有一个看起来像这样的Event和一个看起来类似这样的事件的EventsWrapper。我将得到一个EventsWrappers流。我应该在不同的EventsWrappers中删除重复的Events。例如,如果我有2个EventsWrappers,如下所示:

[EventsWrapper{id='ew1',org='org1',events=[Event{id='1',name='event1'},事件{id='e2',name='event2'}]},EventsWrapper{id='ew2',org='rg2',events=[Event{id='1',name='event1'},事件{id='e3',name='event3'}]}

我应该发出以下作为输出:

[EventsWrapper{id='ew1',org='org1',events=[Event{id='1',name='event1'},事件{id='e2',name='event2'}]},EventsWrapper{id='ew2',org='org2',events=[Event{id='3',name='event3'}]}

即确保e1事件只发出一次,假设这两个事件在缓存的时间和大小要求内。

我创建了一个RichFlatmap函数,在这里我启动了一个番石榴缓存和值状态,就像这样。并将Guava缓存设置为这样的值状态。我的整个管道看起来是这样的。

但每次我尝试更新值状态内的番石榴缓存时:

eventsState.value().put(eventId, true);

我得到以下错误:

java.lang.NullPointerException
at com.google.common.cache.LocalCache.hash(LocalCache.java:1696)
at com.google.common.cache.LocalCache.put(LocalCache.java:4180)
at com.google.common.cache.LocalCache$LocalManualCache.put(LocalCache.java:4888)
at events.piepline.DeduplicatingFlatmap.lambda$flatMap$0(DeduplicatingFlatmap.java:59)
at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:176)

经过进一步挖掘,我发现错误是因为Guava缓存中的keyEquivalence为空。我通过直接在Guava缓存上设置进行检查(不是通过状态,而是直接在缓存上(,这很好。

我觉得这可能是因为ValueState无法序列化GuavaCache。所以我添加了一个像这样的序列化程序,并像这样注册:

env.registerTypeWithKryoSerializer((Class<Cache<String,Boolean>>)(Class<?>)Cache.class, CacheSerializer.class);但这也无济于事。

我有以下问题:

  1. 知道我在上述情况下可能对Guava缓存做了什么错误吗
  2. 我在翻滚和滑动窗口实现中看到的是预期的吗?还是我做错了什么
  3. 如果我不在ValueState中设置Guava缓存,而只是将其用作DeduplingFlatmap类中的普通对象,并直接在Guava缓存上操作,而不是通过ValueState操作,会发生什么?我的理解是,Guava缓存不会是检查点的一部分。因此,当管道出现故障并重新启动时,GuavaCahe将丢失其中的所有值,并且在重新启动时将为空。这种理解正确吗

非常感谢您的帮助。

  1. 见下文
  2. 这些窗户的性能如预期
  3. 你的理解是正确的

即使你确实让它工作了,使用Guava缓存作为ValueState也会表现得很差,因为RocksDB将在每次访问时反序列化整个缓存,并在每次更新时重新序列化它。

此外,看起来您正试图在所有组织中共享一个缓存实例,而这些组织恰好在一个平面图实例中多路复用。这是行不通的,因为RocksDB状态后端将为每个组织制作一个缓存副本(这是所涉及的序列化的副作用(。

您的要求并不完全清楚,但重复数据消除查询可能会有所帮助。但我认为MapState与KeyedProcessFunction中的计时器相结合更有可能是您需要的构建块。这里有一个例子可能会帮助你开始(但你会希望以不同的方式处理计时器(。

相关内容

  • 没有找到相关文章

最新更新