Kafka流每次迭代执行多个标点符号



我有一个带有时间表的转换器

context.schedule(scanFrequency, PunctuationType.WALL_CLOCK_TIME, new MyPunctuator(stateStore));

然后我的标点符号类

public class MyPunctuator implements Punctuator {
@Override
public void punctuate(final long timestamp) {
}
}

现在奇怪的是,当时间表工作时,每次迭代调用4次的标点符号

[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164829
[StreamThread-1] INFO MyPunctuator  - store=0
[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164830
[StreamThread-1] INFO MyPunctuator  - store=1
[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164831
[StreamThread-1] INFO MyPunctuator  - store=0
[StreamThread-1] INFO MyPunctuator  - [Punctuator Scan] - Timestamp 1660083164832
[StreamThread-1] INFO MyPunctuator  - store=0

知道为什么吗?

根据标点符号类型,这就是Schedule-标点符号的工作方式。您可以将此示例与您的用例进行比较。

标点符号类型。STREAM_TIME

如果您根据标点符号类型每10秒安排一次标点符号功能。STREAM_TIME,如果您处理一个由60条记录组成的流,这些记录的连续时间戳从1(第一条记录(到60秒(最后一条记录(,则会调用pump((6次。无论实际处理这些记录所需的时间如何,都会发生这种情况。无论处理这60条记录需要一秒钟、一分钟还是一小时,都会调用punct((6次。

标点符号类型。WALL_CLOCK_TIME

当使用挂钟时间(即标点类型.wall_clock_time(时,标点((完全由挂钟时间触发。如果"标点符号"函数是基于"标点类型"安排的,请重复使用上面的示例。WALL_CLOCK_TIME,如果这60条记录在20秒内被处理,则会调用2次(每10秒调用一次(。如果这60条记录是在5秒内处理的,那么就根本不会调用spo点((。请注意,通过在init((方法内多次调用ProcessorContext#schedule((,可以在同一处理器内调度具有不同标点类型的多个标点符号回调。

Transformer被初始化了一些随机次数。我有4个线程,一个主题,2个分区,我有10个标点符号。我不明白。

相关内容

  • 没有找到相关文章

最新更新