Kafka Streams 低级处理器 API 的标点符号不会定期运行



我有一个名为AddCashProcessorprocessor

AddCashProcessor,我维护了一个KeyValueStore<String, HashSet<String>>来记录谁通过process方法支付物品的用户ID。代码是这样的:

@Override
public void process(String key, String value) {
    HashSet<String> set = Optional.ofNullable(store.get(key)).orElse(new HashSet<>());
    set.add(value);
    store.put(key, set);
}

punctuate名为AddCashPunctuator的类的方法中,implements the Punctuator interface,我得到了将其插入MySQL的HashSet.size()

@Override
public void punctuate(long l) {
    List<String> updateSqls = new ArrayList<>();
    KeyValueIterator<String, HashSet<String>> iter = store.all();
    while (iter.hasNext()) {
        KeyValue<String, HashSet<String>> entry = iter.next();
            int size = entry.getValue().size();
            ....
        }
    }
    iter.close();
    MySqlUtils.update(updateSqls);
}

AddCashPunctuatorAddCashProcessor init方法中注册,如下所示:

@Override
public void init(ProcessorContext context) {
    this.context = context;
    ....
    this.context.schedule(30000L, PunctuationType.WALL_CLOCK_TIME, new AddCashPunctuator());
}

我想每 30 秒执行一次 punctuate 方法。但它不是这样做的。有时运行良好,有时暂停。当暂停结束时,执行多次。

这是为什么呢?是因为HashSetKeyValueStore的数据太大吗?我的卡夫卡流版本是 1.0.0。 我的卡夫卡版本是 0.10.1.1。

谢谢!

在 Kafka Streams 中,有一个线程负责常规处理和标点符号。因此,如果处理时间超过标点符号计划,则对标点符号的调用可能会延迟。与此无关,GC 暂停可能会延迟标点符号。

因此,标点符号(对于任何具有 GC 暂停的系统(是尽力而为的(即使有专用的标点符号线程(。

不幸的是,如果错过了标点符号,Kafka Streams 会重播所有错过的标点符号。这就是为什么你一次得到多个的原因。已经修复了即将发布的 1.1 版本 (https://issues.apache.org/jira/browse/KAFKA-6323(。

相关内容

  • 没有找到相关文章

最新更新