我有一个带有无界流的flink(v1.13.3(应用程序(使用kafka(。我的一条小溪很忙。繁忙值(我可以在UI上看到(也会随着时间的推移而增加。当我刚开始使用flink应用程序时:
sum by(task_name) (flink_taskmanager_job_task_busyTimeMsPerSecond{job="Flink", task_name="MyProcessFunction"})
返回300-450毫秒- 在5个++小时之后,CCD_ 2返回5-7 sn
这个函数非常简单,它只使用rocksdb作为状态后端:
public class MyObj implements Serializable
{
private Set<String> distinctValues;
public MyObj()
{
this.distinctValues = new HashSet<>();
}
public Set<String> getDistinctValues() {
return distinctValues;
}
public void setDistinctValues(Set<String> values) {
this.distinctValues = values;
}
}
public class MyProcessFunction extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ValueState<MyObj> state;
@Override
public void open(Configuration parameters)
{
ValueStateDescriptor<MyObj> stateDescriptor = new ValueStateDescriptor<>("MyObj",
TypeInformation.of(MyObj.class));
state = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<Output> out) throws Exception
{
MyObj stateValue = state.value();
if (stateValue == null)
{
stateValue = new MyObj();
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10mins);
}
stateValue.getDistinctValues().add(value.getValue());
if (stateValue.getDistinctValues().size() >= 20)
{
state.clear();
}
else
{
state.update(stateValue);
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Output> out)
{
state.clear();
}
}
注意:在实现valueState之前,我只是在使用ListState。但与listState一起使用flink_taskmanager_job_task_busyTimeMsPerSecond
会返回25-30sn:
public class MyProcessFunction extends extends KeyedProcessFunction<String, KafkaRecord, Output>
{
private transient ListState<String> listState;
@Override
public void open(Configuration parameters)
{
ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("myobj", TypeInformation.of(String.class));
listState = getRuntimeContext().getListState(listStateDescriptor);
}
@Override
public void processElement(KafkaRecord value, Context ctx, Collector<KafkaRecord> out) throws Exception
{
List<String> values = IteratorUtils.toList(listState.get().iterator());
if (CollectionUtils.isEmpty(values))
{
ctx.timerService().registerProcessingTimeTimer(value.getTimestamp() + 10min);
}
if (!values.contains(value.getValue()))
{
values.add(value.getValue());
listState.update(values);
}
if (values.size() >= 20)
{
...
}
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<KafkaRecord> out)
{
listState.clear();
}
}
一旦RocksDB达到工作状态不再适合内存的程度,预计会出现一些放缓。但是,在这种情况下,您应该能够通过从ValueState
切换到MapState
来显著提高性能。
目前,您正在对每条记录的整个hashSet进行反序列化和保留。随着时间的推移,这些hashSet会增长,性能会下降。
RocksDB状态后端具有MapState
的优化实现。映射中的每个键/值条目都存储为一个单独的RocksDB对象,因此您可以查找、插入和更新条目,而无需对映射的其余部分执行serde操作。
ListState
还针对RocksDB进行了优化(可以在不反序列化列表的情况下将其附加到(。通常,在使用RocksDB时,最好避免将集合存储在ValueState
中,并尽可能使用ListState
或MapState
。
由于基于堆的状态后端将其工作状态保持为堆上的对象,因此不会出现同样的问题。