我有一个包含AbstractProcessor(实际上是两个)的Kafka Streams拓扑。 在其中之一中,我将Punctuation API
与WALL_CLOCK_TIME
一起使用来计划刷新处理所需的一些参考数据。 我在任务开始时这样做,然后每隔一段时间(假设 1 小时)安排一次。num.stream.threads
配置为 2。
例如,我有一些这样的代码:
def loadReferenceData() = {
logger.info("Loading All Reference Data...")
// atomically (re)load some data
}
override def init(context: ProcessorContext) = {
super.init(context)
logger.info("Loading reference data initially...")
loadReferenceData()
context.schedule(1000 * reloadDataSeconds, PunctuationType.WALL_CLOCK_TIME, (timestamp) => {
loadReferenceData()
context.commit(); // Unsure if necessary
});
}
如果没有传入记录,在正在运行的应用程序的单个实例的日志中,我可以看到 init 的这些日志:
[2019-06-11 08:54:19,518] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 08:53:31,080] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 08:53:29,713] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 08:53:29,682] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:54:20,855] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:54:19,714] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:54:19,516] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:53:31,036] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:53:29,668] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 07:53:29,653] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 06:54:20,845] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-11 06:54:19,726] INFO Loading All Reference Data... (com.divvit.dp.streams.applications.StreamProcessor)
所以似乎有许多日志用于输入loadReferenceData
,每小时一次。我希望每小时只能看到 2 个条目(2 个线程),但还有更多(通常为 6 个)。
在日志中,我只在应用程序创建之初看到了 6 次处理器的创建:
[2019-06-10 16:54:19,849] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:18,231] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:54:17,874] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:29,675] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:27,132] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
[2019-06-10 16:53:24,923] INFO Loading reference data initially... (com.divvit.dp.streams.applications.StreamProcessor)
所以这是有道理的:处理器创建一次,每小时更新一次。
但是,当我在应用程序中投入更多负载时,我经常可以看到新的处理器对象的创建。
- 在什么情况下,Kafka Streams 会创建这些处理器的新实例?
- 如何知道我的应用程序实例将创建多少个处理器实例?
- 如果处理器可以由 Kafka Streams 自行决定关闭/创建,那么对于这些"外部"操作,标点符号 API 看起来是矫枉过正的(或者只是不是为此而设计的),单独的定期更新线程会更好地解决问题,不是吗?
Kafka-Streams 将在输入主题上为每个分区创建一个处理器,每个处理器都有自己的调度。(如果您使用状态存储,这实际上非常有用,因为状态也将进行分区。
如果要对内部状态应用常规操作,则调度程序非常有用。它可以很好地停止常规处理,并确保您在计划操作期间所做的一切都是一致的。如果手头的任务与流式处理本身无关,那么单独的线程可能同样好。
如果选择单独的线程,请确保在 kafka 流线程崩溃时适当地终止它。否则,应用程序将挂起在计时器线程上,但不消耗任何 kafka 消息。
增加线程数(num.stream.threads
)意味着多个分区将同时被消耗。它与彼此相邻启动多个实例具有相同的行为。请参阅 https://docs.confluent.io/current/streams/architecture.html#threading-model