在Flink 1.8.1中,我尝试将State TTL应用于BroadcastState(使用MapStateDescriptor(,如下所示:
(Holder 是一个 POJO 包装了一个私有的 int 变量 "deger"(
...
StreamExecutionEnvironment envStream = StreamExecutionEnvironment.getExecutionEnvironment();
StateBackend stateBackend = new FsStateBackend("file://.....");
envStream.setStateBackend(stateBackend);
envStream.enableCheckpointing(1_000L, CheckpointingMode.EXACTLY_ONCE);
...
MapStateDescriptor<Integer, Client> clientMapStateDescriptor = new MapStateDescriptor<>(
"ClientBroadcastState",
BasicTypeInfo.INT_TYPE_INFO,
TypeInformation.of(new TypeHint<Client>() {})
);
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(3))
// .cleanupFullSnapshot()
// .cleanupInBackground()
.cleanupIncrementally(100, false)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
clientMapStateDescriptor.enableTimeToLive(ttlConfig);
DataStream<Client> clientDataStream = envStream.fromCollection(clientList);
// clientDataStream.print("clientDataStream");
BroadcastStream<Client> clientBroadcastStream = clientDataStream
.broadcast(clientMapStateDescriptor);
List<Holder> holderList = new ArrayList<>(count);
for(int i = 0; i < count; i++) {
holderList.add(new Holder(i));
}
DataStream<Holder> integerHolderDataStream = envStream.fromCollection(holderList);
BroadcastConnectedStream<Holder, Client> connectedStreams = integerHolderDataStream
.keyBy("deger")
.connect(clientBroadcastStream);
SingleOutputStreamOperator<Row> operator = connectedStreams.process(new KeyedBroadcastProcessFunction<Integer, Holder, Client, Row>() {
@Override
public void processElement(Holder value, ReadOnlyContext ctx, Collector<Row> out) throws Exception {
for (Map.Entry<Integer, Client> entry : ctx.getBroadcastState(clientMapStateDescriptor).immutableEntries()) {
Client c = ctx.getBroadcastState(clientMapStateDescriptor).get(entry.getKey());
System.out.println(value.getDeger() + " - " + c);
}
Thread.sleep(1000L);
}
@Override
public void processBroadcastElement(Client value, Context ctx, Collector<Row> out) throws Exception {
ctx.getBroadcastState(clientMapStateDescriptor).put(value.getId(), value);
}
});
...
holderList 有足够的实例来测试状态中的条目是否被逐出。
但是BroadcastState
中的条目不会过期。
我尝试过的事情:
- 使用不同的状态后端 (
FsStateBackend
( - 启用检查点
- 显式访问映射状态值
我可能做错了什么?BroadcastState 是否支持 StateTTL?
如果没有,您能否提供一个示例来说明如何逐出 BroadcastState 中的条目(使用 MapStateDescriptor(?
根据 FLIP-25 中的说法,StateTTL 仅适用于键控状态。
存储在 BroadcastState 中的项只能在 BroadcastProcessFunction(或键控 BroadcastProcessFunction(的 processBroadcastElement 方法中写入或清除 - 这意味着您必须将其作为处理另一个广播元素接收的一部分。而且你需要注意在所有并行实例中使用完全相同的逻辑,因为 Flink 希望每个实例在 BroadcastState 的内容上保持一致,如果你在这里做任何非确定性或特定于实例的事情,可能会导致奇怪的事情。
一种解决方案是广播流记录,这些记录被收件人解释为命令,以使广播状态中的早期记录过期。