我们在应用程序中使用spring kafka 2.3.0。已经观察到在以下场景中的一些处理故障
@Service
@EnableScheduling
public class KafkaService {
public void sendToKafkaProducer(String data) {
kafkaTemplate.send(configuration.getProducer().getTopicName(), data);
}
@KafkaListener(id = "consumer_grpA_id",
topics = "#{__listener.getEnvironmentConfiguration().getConsumer().getTopicName()}", groupId = "consumer_grpA", autoStartup = "false")
public void onMessage(ConsumerRecord<String, String> data) throws Exception {
passA(data);
}
private void passB(String message) {
//counter to keep track of retry attempts
if (counter.containsKey(message.getEventID())) {
//RETRY_COUNT = 5
if (counter.get(message.getEventID()) < RETRY_COUNT) {
retryAgain(message);
}
} else {
firstRetryPass(message);
}
}
private void retryAgain(String message) {
counter.put(message.getEventID(), counter.get(message.getEventID()) + 1);
try {
registry.stop(); //pause the listener
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void firstRetryPass(String message) {
// First Time Entry for count and time
counter.put(message.getEventID(), 1);
try {
registry.stop();//pause the listener
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void passA(String message) {
try {
passToTarget(message); //Call target processor
LOGGER.info("Message Processed Successfully to the target");
} catch (Exception e) {
targetUnavailable= true;
passB(message);
}
}
private void passToTarget(String message){
//processor logic, if the target is not available, retry after 15 mins, call passB func
}
@Scheduled(cron = "0 0/15 * 1/1 * ?")
public void scheduledMethod() {
try {
if (targetUnavailable) {
registry.start();
firstTimeStart = false;
}
LOGGER.info(">>>Scheduler Running ?>>>" + registry.isRunning());
} catch (Exception e) {
LOGGER.error(e.getMessage());
}
}
}
- 在处理间隔后收到第一条消息时,消费者不会收到第一条信息。随后的消息将被处理
- 由于我们无法直接访问卡夫卡主题,我们无法确定没有从消费者那里获得的过程
- 我们如何追踪那些没有被发现的事件?为什么会这样
- 我们还配置了一个调度器,其工作是保持Kafka的注册表运行。那么,当我们已经配置了侦听器时,是否需要这个调度器
- 如果我们保持侦听器运行,内存和CPU利用率指标是什么。这就是我们使用Kafka注册表在目标关闭时显式停止侦听器的原因之一。因此,需要验证这种方法是否可持续。我的直觉是,这违背了Listener的基本工作,因为它的主要工作是继续侦听新事件,而不管目标状态如何编辑/strong>*
-
除非使用
stop(Runnable)
,否则不应该停止侦听器线程上的注册表,否则会出现死锁和延迟,因为容器会等待侦听器退出。 -
在处理完上次轮询获取的所有剩余记录之前(除非设置了
max.poll.records=1
.(,停止容器(通过注册表(实际上不会生效 -
当侦听器正常退出时,将提交记录的偏移量,这样记录就不会在下一次启动时重新传递。
您可以将ContainerStoppingErrorHandler
用于此用例。请参见此处。
抛出一个异常,错误处理程序将为您停止容器。
但这将在第一次尝试时停止容器。
如果要重试,请使用SeekToCurrentErrorHandler
,并在重试次数用完后从恢复器调用ContainerStoppingErrorHandler
。