我需要为每个键保持最后25秒的值处于flink map状态,但TTL在达到25秒时会删除所有值。请参阅我的代码,在代码列表中,每秒保存每个传感器id的传入数据,为了减少内存存储,我必须在列表中只保留25秒的数据。有什么方法可以实现这一点吗?TTL清除整个列表。
public class ContinousDataProcessor
extends KeyedProcessFunction<String,SensorData,Tuple2<String,Integer>> {
private transient MapState<String, List<SensorData>> SensorValueMapState;
private static final long serialVersionUID = 1L;
@Override
public void open(Configuration config) {
MapStateDescriptor<String, List<SensorData>> varibaleTagValueMapDescriptor = new MapStateDescriptor(
"variableTagValueMapState", String.class, SensorData.class);
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(25))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
varibaleTagValueMapDescriptor.enableTimeToLive(ttlConfig);
SensorValueMapState= getRuntimeContext().getMapState(varibaleTagValueMapDescriptor);
}
@Override
public void processElement(SensorData inputData, Context arg1, Collector arg2) throws Exception {
if (SensorValueMapState.contains(inputData.sensorId)) {
SensorValueMapState.get(inputData.sensorId).add(inputData);
} else {
List<SensorData> sensorDataList = new ArrayList<>();
sensorDataList.add(inputData);
SensorValueMapState.put(inputData.sensorId, sensorDataList);
}
for (SensorData str : SensorValueMapState.get(inputData.sensorId)) {
System.out.println(str.eventTime);
}}
据我所知,您希望对列表中的每个元素应用TTL。在您的情况下,该列表是处于映射状态的值。映射状态不了解映射状态中用户值的结构。这是对状态后端中的数据布局的限制。因此,在当前实现中,不可能对每个元件应用TTL。
TTL应用于值状态下的每个用户值、列表状态下的每用户元素以及映射状态下的各用户键/值对。
根据应用程序的要求,您可以尝试使用复合密钥列出状态:
key of KeyedProcessFunction = current key of your KeyedProcessFunction + your current map state key
这不允许通过KeyedProcessFunction的当前键轻松获取所有列表,尽管您现在可以这样做。