我有一个名为AddCashProcessor
的processor
。
在AddCashProcessor
,我维护了一个KeyValueStore<String, HashSet<String>>
来记录谁通过process
方法支付物品的用户ID。代码是这样的:
@Override
public void process(String key, String value) {
HashSet<String> set = Optional.ofNullable(store.get(key)).orElse(new HashSet<>());
set.add(value);
store.put(key, set);
}
在punctuate
名为AddCashPunctuator
的类的方法中,implements the Punctuator interface
,我得到了将其插入MySQL的HashSet.size()
:
@Override
public void punctuate(long l) {
List<String> updateSqls = new ArrayList<>();
KeyValueIterator<String, HashSet<String>> iter = store.all();
while (iter.hasNext()) {
KeyValue<String, HashSet<String>> entry = iter.next();
int size = entry.getValue().size();
....
}
}
iter.close();
MySqlUtils.update(updateSqls);
}
AddCashPunctuator
在AddCashProcessor
init
方法中注册,如下所示:
@Override
public void init(ProcessorContext context) {
this.context = context;
....
this.context.schedule(30000L, PunctuationType.WALL_CLOCK_TIME, new AddCashPunctuator());
}
我想每 30 秒执行一次 punctuate
方法。但它不是这样做的。有时运行良好,有时暂停。当暂停结束时,执行多次。
这是为什么呢?是因为HashSet
或KeyValueStore
的数据太大吗?我的卡夫卡流版本是 1.0.0。 我的卡夫卡版本是 0.10.1.1。
谢谢!
在 Kafka Streams 中,有一个线程负责常规处理和标点符号。因此,如果处理时间超过标点符号计划,则对标点符号的调用可能会延迟。与此无关,GC 暂停可能会延迟标点符号。
因此,标点符号(对于任何具有 GC 暂停的系统(是尽力而为的(即使有专用的标点符号线程(。
不幸的是,如果错过了标点符号,Kafka Streams 会重播所有错过的标点符号。这就是为什么你一次得到多个的原因。已经修复了即将发布的 1.1 版本 (https://issues.apache.org/jira/browse/KAFKA-6323(。