我见过一个实现SinkFunction的类的Apache Flink程序,该程序在没有适当同步原语的情况下定期上传数据。
它被认为是危险的吗?
由于api读取
"将给定值写入接收器。每个记录都会调用此函数">
我假设给定的函数可以由给定同一实例的多个线程调用,这可能会在没有潜在锁定机制或并发数据结构的情况下导致竞争条件。这是正确的解释吗?
private List<Record> bufferedRecords;
@Override
public void invoke(Point point, Context context) throws Exception {
bufferedRecords.add(point);
if (bufferedRecords.size() == batchSize) {
writeRecords(bufferedRecords);
bufferedRecords.empty();
}
}
后续:为了使调用线程安全,我认为将整个函数封装在一个锁上似乎就足够了。在不牺牲bufferedRecords
不能越过batchSize
并且没有遗漏或重复记录的特性的情况下,有没有更好的方法来处理这种情况?
Flink中的所有用户定义函数仅由同一线程调用。通常每个子任务/线程都有一个这样的函数的副本(通过Serializable(,以避免代价高昂的同步。
所以你的水槽功能是安全的。但是,当缓存值时,如果依赖Flink的容错来获得确切的结果,则需要确保将它们置于该状态。如果您使用检查点,您还应该注意,只要bufferedRecords.empty()
块,就不能执行任何检查点。