Kafka Listener并发-如何处理从6个线程触发的Stop/Idle事件



我每天/每周有两个卡夫卡听众。Daily具有autoStartup=true和Weekly的autoStartup为false。我有一个端点来停止正在运行的Daily并启动Weekly。一旦Weekly消费完消息,我就等待空闲事件(设置为1分钟(触发,在那里我停止Weekly。现在我正在收听《周刊》上的"停止"活动,在这里我开始了《每日邮报》。现在的问题是我将并发设置为6。所以我得到了6个空闲事件和6个停止事件。我的处理方式如下。我想知道这是一个好的做法,还是有更好的做法?

我在ConcurrentHashMap中收集所有每日停止事件。一旦达到concurreny计数,这意味着所有6个每日侦听器线程都将停止,并可以启动Weekly侦听器。

private void processDailyStopEvent(ConsumerStoppedEvent event)
{
LOGGER.info("Processing DAILY Stop events");
KafkaMessageListenerContainer source = (KafkaMessageListenerContainer) event.getSource();
ConcurrentMessageListenerContainer container = (ConcurrentMessageListenerContainer) event.getContainer(ConcurrentMessageListenerContainer.class);
eventMap.get(“dailyStop”).add(source.getListenerId());
LOGGER.info("Added ListenerId {} to Map<dailyStop>", source.getListenerId());
if (eventMap.get(“dailyStop”).size() == container.getConcurrency()) {
LOGGER.info("All DAILY Stop events are captured. Clearing the Map<dailyStop>");
eventMap.get(“dailyStop”).clear();
LOGGER.info("Starting WEEKLY Consumer now.");
kafkaService.startWeeklyConsumer();
}
}

您所拥有的是一种合理的方法。

或者,您可以只等待并发容器的停止事件(其中源等于容器(——它是在所有子容器停止后发布的。

最新更新