使用者不会从主题中获取新记录,每次启动时总是指向相同的偏移位置



我正在使用spring-kafka2.3.7

问题1:系统中有两个消费者(KafkaListeners(。当应用程序启动时,其中一个Consumers总是指向相同的Topic/Partition/OffSet(s)。这可能是由于消费者重新平衡。

问题2:(1)之后,相同的Consumer也不能使用发送到Topic的任何新的Message

日志:

[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  [                                    ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-1 to the committed offset 
FetchPosition{offset=79, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-1.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 1 rack: use2-az1), epoch=3}}
[tConsumer-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  [                                    ] : [Consumer clientId=consumer-1, groupId=enrollmentConsumer] Setting offset for partition bev3_enrollment_sync_topic_dev-0 to the committed offset 
FetchPosition{offset=73, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-2.cpsdmskclustertestenv.7hij1q.c2.kafka.us-east-2.amazonaws.com:9094 (id: 2 rack: use2-az2), epoch=3}}

然而,在改变group-id之后,它被解决了。但我很想知道旧的Offset(s)如何与旧的group-id一起提交。我正在使用STCHACKMODE.MANUAL_IMMEDIATEoffset->LATEST,谢谢。

将赋值提交容器属性设置为NEVER

/**
* Set the assignment commit option. Default
* {@link AssignmentCommitOption#LATEST_ONLY_NO_TX}.
* @param assignmentCommitOption the option.
* @since 2.3.6
*/
public void setAssignmentCommitOption(AssignmentCommitOption assignmentCommitOption) {

默认情况下,如果自动偏移重置为latest,则容器在分配期间提交当前位置。

最新更新