如何在不删除先前值的情况下更新MapState上的值



我这次的问题是:使用MapState时,使用mapstate.put(key, value)将键的当前值修改为MapState是安全的,还是我需要执行mapState.remove(key),然后再次执行mapstate.put(key, value),或者无论如何都要更新此值?

从Flink的状态抽象开始,它不是为并发访问而设计的,不应该在多个线程之间共享。那么,重新表述我的问题:我可以在不删除密钥的情况下根据密钥将值更新到mapState中,然后重新放置密钥吗?在不将该运算符的并行度设置为1的情况下,如何使用mapState避免ConcurrentModificationException?

因为我有一个例外:

java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$EntryIterator.next(HashMap.java:1479)
at java.util.HashMap$EntryIterator.next(HashMap.java:1477)
at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:111)
at org.apache.flink.api.common.typeutils.base.MapSerializer.copy(MapSerializer.java:49)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:287)
at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:311)
at org.apache.flink.runtime.state.heap.HeapMapState.get(HeapMapState.java:85)
at org.apache.flink.runtime.state.ttl.TtlMapState.lambda$getWrapped$0(TtlMapState.java:63)
at org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92)
at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:62)
at org.apache.flink.runtime.state.ttl.TtlMapState.contains(TtlMapState.java:92)
at org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
at com.teavaro.cep.transformations.SessionUseCase$1.generateSessionRecord(SessionUseCase.java:65)
at com.teavaro.cep.transformations.SessionUseCase$1.generateSessionRecord(SessionUseCase.java:42)
at com.teavaro.cep.operators.SessionIdentificationProcessFunction.process(SessionIdentificationProcessFunction.java:25)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:546)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:454)
at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:251)
at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:774)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:262)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:184)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

非常感谢。致以亲切的问候。

只需调用put即可更新MapState中的条目。无需先remove现有值。

相关内容

  • 没有找到相关文章

最新更新