在 kafka consumption 中获取"关闭执行器服务"(org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler)



我有时会在关闭ExecutorService然后Consumer停止时出错。Kafka事件被再次处理,但重复出现,因为该事件在存储数据之前已经被处理。我已经用幂等性修正了重复。但有没有办法防止这种关闭?指数退避策略的性质:

backOffPolicy.setMaxInterval(60000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setInitialInterval(1000);
simpleRetryPolicy.setMaxAttempts(60);

这是因为您有一个很大的重试间隔,并且使用者线程被阻塞;由于错误处理程序支持回退和异常分类,因此不赞成在侦听器适配器中重试。每当容器停止时,错误处理程序也可以退出重试。

您使用适当配置的DefaultErrorHandler(2.8及更高版本(或SeekToCurrentErrorHandler用于早期版本(2.7.10之前的版本不再支持OSS用户(。

相关内容

最新更新