private MapState<String, EventsHistory> eventsMap = null;
public void processElement2(Event event,
Context context,
Collector<JoinedEvent> collector) throws Exception {
String name = event.getExperimentName();
if (eventsMap.get(name) == null) {
eventsMap.put(name, new EventsHistory());
}
eventsMap.get(name).put(event.getEventTime(), event);
}
class EventsHistory {
private final Map<Long, Event> events = new HashMap<>();
public Map<Long, Event> getEvents() {
return events;
}
public void put(final Long eventTime, final Event event) {
events.put(eventTime, event);
}
}
我有上面的代码,我想使用Flink的MapState
来维护地图地图。当我在本地测试时,我可以看到状态更新很好。但是当我在集群中运行它时,eventsMap
总是空的。
在MapState
中使用地图映射是否有效?有更好的方法来实现这一点吗?
作为替代,我尝试了下面的版本,在那里我自己进行分组。奇怪的是,这是有效的。
private MapState<EventKey, Event> assignmentEventsMap = null;
public final class EventKey {
private String name;
private long eventTime;
}
public void processElement2(Event event,
Context context,
Collector<JoinedEvent> collector) throws Exception {
String name = event.getExperimentName();
eventsMap
.put(new EventKey(event.getName(), event.getEventTime()),
event);
}
您共享的代码很难理解,但也许您误解了MapState是什么。ValueState提供了一个分片的键/值存储,分布在集群中。MapState为您提供了一个分片的键/值存储,其中的值本身就是嵌套的Maps。
换句话说,MapState始终是地图中的地图。你最终试图创建一个地图的地图——这太远了一个层次。
我假设你正在尝试构建这个结构,在那里你实际上有一个从实验名称到时间戳到事件的嵌套映射的映射:
name -> (time -> event)
假设您的事件流已经由实验名称键入,那么您真正想要的不是使用MapState<String, EventsHistory> eventsMap
,而是MapState<Long, Event> eventsMap
,而不是
eventsMap.get(name).put(event.getEventTime(), event);
你应该做
eventsMap.put(event.getEventTime(), event);
请参阅有关ValueState的教程和Flink文档中使用MapState的示例,了解如何使用这些机制的更多背景知识。