如何使用Camel-Kafka提交偏移螺纹安全



被问到如何手动控制使用骆驼卡夫卡的偏移量?我想用骆驼卡夫卡手动承诺偏移。我的路线:

.from(kafka:topic1)
 .aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
 .process(new ManualCommitProcessor())

,在将消息发送到另一个主题后,ManualCommitProcessor将执行承诺。

问题在于,聚合器和Kafka生产商正在与负责抵消承诺的Kafka消费者分开工作。因此,我以

结尾
java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

聚合后有可能再次致电消费者线程并派遣提交偏移?

否这是不可能的,消费者线程独立于聚合器的输出。

最新更新