我有一个处理TumblingEventTimeWindows的ProcessWindowFunction
,其中我使用状态存储来保留多个翻转窗口中的一些值。 我的问题是这个状态存储没有在翻转的窗口中保留,即如果我首先在窗口 [0,999] 中存储某些内容,然后从窗口 [1000,1999] 访问此存储,则存储为空。 我知道这里所述的全局状态和每个窗口状态。我想使用全局状态。我还尝试创建一个最小的工作示例来调查这一点:
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
public class twStateStoreTest {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
final DataStream<Element> elements = env.fromElements(
Element.from(1, 500),
Element.from(1, 1000),
Element.from(1, 1500),
Element.from(1, 2000),
Element.from(99, 9999)
).
assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Element>() {
long w;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(w);
}
@Override
public long extractTimestamp(Element element, long previousElementTimestamp) {
w = element.getTimestamp();
return w;
}
});
elements
.keyBy(new KeySelector<Element, Integer>() {
@Override
public Integer getKey(Element element) throws Exception {
return element.value;
}
})
.window(TumblingEventTimeWindows.of(Time.milliseconds(1000L)))
.process(new MyProcessWindowFn()).
print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
MapState<Integer, Integer> stateStore;
@Override
public void open(Configuration parameters) throws Exception {
stateStore = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class));
}
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
if (stateStore.get(key) == null) {
stateStore.put(key, 1);
}else {
int previous = stateStore.get(key);
stateStore.put(key, previous+1);
}
out.collect("State store for " + elements.toString() + " is " + stateStore.entries().toString()
+ " for window : " + context.window());
}
}
static class Element {
private final long timestamp;
private final int value;
public Element(long timestamp, int value) {
this.timestamp = timestamp;
this.value = value;
}
public long getTimestamp() {
return timestamp;
}
public int getValue() {
return value;
}
public static Element from(int value, long timestamp) {
return new Element(timestamp, value);
}
}
}
在这里,我尝试计算为键调用process()
函数的次数。此示例有效,并且状态确实存储在翻转窗口中。我确保此示例完全反映了实际的processWindow函数,并删除了其他不必要的代码。
但是状态不会在实际进程窗口函数中跨窗口保留!
有什么我明显缺少的陷阱需要解释吗? 对于使用如下定义的 MapState 的 ProcessWindowFunction,是否有任何其他原因无法在 EventTimeTumblingWindows 中保留状态:
private MapState<UserDefinedEnum, Boolean> activeSessionStore;
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
这是删除了膨胀的实际类,并根据@David和@ShemTov的建议:
public class IUFeatureStateCombiner extends ProcessWindowFunction<IUSessionMessage, IUSessionMessage, IUMonitorFeatureKey, TimeWindow> {
private final static MapStateDescriptor<IUEventType, Boolean> desc = new MapStateDescriptor<IUEventType, Boolean>(
"store", IUEventType.class, Boolean.class);
private final Logger LOGGER = LoggerFactory.getLogger(IUFeatureStateCombiner.class);
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
MapState<IUEventType, Boolean> activeSessionStore = context.globalState().getMapState(desc);
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- This returns an empty iterable
// even though I populated activeSessionStore with some values in the previous invocation of process()
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}
@Override
public void clear(Context context) throws Exception {
context.globalState().getMapState(desc).clear();
}
}
我使用以下方法调用它:
inputStream.keyBy(IUSessionMessage::getMonitorFeatureKey).
window(TumblingEventTimeWindows.of(Time.milliseconds(1000L))).
process(new IUFeatureStateCombiner())
这仍然存在问题,即使我在上一次调用中填充了状态,我也会在第二次调用process()
中得到一个空的可迭代对象。
编辑:问题解决了,不应该调用clear((方法,因为这是一个全局状态。
你想做更多类似的事情。请记住,这些是每个键的状态存储 - 每个键都有一个单独的映射 - 所以你在哪里做stateStore.get(key)
,这真的没有意义。也许您只需要ValueState
,如果您只需要为每个键存储一个整数。
static class MyProcessWindowFn extends ProcessWindowFunction<Element, String, Integer, TimeWindow> {
private final static MapStateDescriptor mapDesc = new MapStateDescriptor<Integer, Integer>("stateStore", Integer.class, Integer.class);
@Override
public void process(Integer key, Context context, Iterable<Element> elements, Collector<String> out) throws Exception {
MapState<Integer, Integer> stateStore = context.globalState.getMapState(mapDesc);
...
}
}
请注意,全局状态存储永远不会被清除。因此,如果你有一个无限的密钥空间,你最终会遇到问题。您可以在状态描述符上配置状态 TTL 来处理此问题。
我的错误是我错误地使用了clear()
方法。由于这是一个全局状态,因此使用clear()
方法将在 TumblingWindow 过期后立即清除该状态。正如 David 所指出的,全局状态永远不会被清除,我们必须为无限密钥流定义一个 TTL。
据我所知,您无法从@override打开方法获取全局状态。
您需要从 ProcessWindowFunction 上的 process 函数中获取它:
context.globalState().getMapState(<your_Map_State_Descriptor>)
我注意到在您的minimum working example
中,您刚刚在open
函数中创建了stateStore
,并直接在process
函数中使用stateStore
。在所谓的actual class
中,您在函数中创建了activeSessionStore
open
但使用context.globalState().getMapState(desc)
来获取状态。在我的理解中,您从未使用过在open
函数中创建的状态,并且始终使用全局状态。这就是为什么当您添加clear
函数时,它会变为空,而在您删除clear
函数后,它将按预期工作。但实际上你已经在你的例子中验证了,你可以在没有这样的全局状态的情况下实现你想要的:
...
@Override
public void open(Configuration parameters) throws Exception {
activeSessionStore = getRuntimeContext().getMapState(new MapStateDescriptor<IUEventType, Boolean>(
"name", UserDefinedEnum.class, Boolean.class));
}
...
@Override
public void process(IUMonitorFeatureKey iuMonitorFeatureKey, Context context, Iterable<IUSessionMessage> elements, Collector<IUSessionMessage> out) throws Exception {
...
Iterable<Entry<IUEventType, Boolean>> lastFeatureStates = activeSessionStore.entries(); // <-------- Finally used the one you created in open function
... do something based on lastFeatureStates....
activeSessionStore.put(...);
}