被问到如何手动控制使用骆驼卡夫卡的偏移量?我想用骆驼卡夫卡手动承诺偏移。我的路线:
.from(kafka:topic1)
.aggregate(new GroupByExchangeStrategy())
.to(kafka:topic2)
.process(new ManualCommitProcessor())
,在将消息发送到另一个主题后,ManualCommitProcessor
将执行承诺。
问题在于,聚合器和Kafka生产商正在与负责抵消承诺的Kafka消费者分开工作。因此,我以
结尾java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
聚合后有可能再次致电消费者线程并派遣提交偏移?
否这是不可能的,消费者线程独立于聚合器的输出。