我正在使用spring-kafka
2.3.7
问题1:系统中有两个消费者(KafkaListeners
(。当应用程序启动时,其中一个Consumers
总是指向相同的Topic/Partition/OffSet(s)
。这可能是由于消费者重新平衡。
问题2:在(1)
之后,相同的Consumer
也不能使用发送到Topic
的任何新的Message
日志:
[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator [ ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-1 to the committed offset
FetchPosition{offset=79, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-1.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 1 rack: use2-az1), epoch=3}}
[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator [ ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-0 to the committed offset
FetchPosition{offset=73, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-2.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 2 rack: use2-az2), epoch=3}}
然而,在改变group-id
之后,它被解决了。但我很想知道旧的Offset(s)
如何与旧的group-id
一起提交。我正在使用STCH
、ACKMODE.MANUAL_IMMEDIATE
和offset->LATEST
,谢谢。
将赋值提交容器属性设置为NEVER
/**
* Set the assignment commit option. Default
* {@link AssignmentCommitOption#LATEST_ONLY_NO_TX}.
* @param assignmentCommitOption the option.
* @since 2.3.6
*/
public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {
默认情况下,如果自动偏移重置为latest
,则容器在分配期间提交当前位置。