在kafa消费者应用程序中处理不同分区和偏移量上的重复项



我们正在尝试使用生产者的记录。到目前为止,它有2个分区,但将来可能会增加,以提高我们的吞吐量。我们正在尝试使用具有2个使用者线程的记录,但我们得到了重复。我们的生产商说他们也包括了关键,但仍然没有解决问题。不知道为什么?

但是,从消费者端来看,由于重复,我们的整个流程周期被触发了两次,这是我们想要避免的。我们担心的是,如果我们将来增加分区,也会增加重复。

我们的流程周期:

正在从流中获取记录-->在基于键的表中颠倒-->基于键提取记录并将其插入表中-->调用api并更新记录

日志:

coming from stream :004582777into offset 500405and partition 0
coming from stream :004582777into offset 499525and partition 1
skipping tax id 004582777
skipping tax id 004582777
coming from stream :002402419into offset 499526and partition 1
coming from stream :002402419into offset 500406and partition 0
skipping tax id 002402419
skipping tax id 002402419
coming from stream :010546936into offset 499527and partition 1
coming from stream :010546936into offset 500407and partition 0
skipping tax id 010546936
skipping tax id 010546936
coming from stream :010646378into offset 500408and partition 0
coming from stream :010646378into offset 499528and partition 1
skipping tax id 010646378
skipping tax id 010646378
coming from stream :010866219into offset 500409and partition 0
coming from stream :010866219into offset 499529and partition 1
skipping tax id 010866219
skipping tax id 010866219
coming from stream :019541747into offset 499530and partition 1
coming from stream :019541747into offset 500410and partition 0
skipping tax id 019541747
skipping tax id 019541747
coming from stream :020438119into offset 500411and partition 0
coming from stream :020438119into offset 499531and partition 1
skipping tax id 020438119
skipping tax id 020438119
coming from stream :020594385into offset 499532and partition 1
coming from stream :020594385into offset 500412and partition 0
skipping tax id 020594385
skipping tax id 020594385
coming from stream :043514479into offset 500413and partition 0
coming from stream :043514479into offset 499533and partition 1
skipping tax id 043514479
skipping tax id 043514479
coming from stream :030446242into offset 500414and partition 0
coming from stream :030446242into offset 499534and partition 1
record count is more than zero :1 for tax id:030446242 <--- we are calling API 2 times because of 2 ocurance
record count is more than zero :1 for tax id:030446242

即使我们从不同的分区得到重复,我们如何确保只选择这个记录的出现?由于这两个记录都是由2个使用者线程并行处理的,所以对于某些记录,它捕获了表中的两个实例,而对于一些记录,它只捕获了1个实例。

代码:

@KafkaListener(topics = "${app.topic}", groupId = "${app.group_id_config}")
public void run(ConsumerRecord<String, GenericRecord> record, Acknowledgment acknowledgement) throws Exception {
try {
prov_tin_number         = record.value().get("providerTinNumber").toString();
prov_tin_type           = record.value().get("providerTINType").toString();
enroll_type             = record.value().get("enrollmentType").toString();
vcp_prov_choice_ind     = record.value().get("vcpProvChoiceInd").toString();
error_flag              = "";
dataexecutor.peStremUpsertTbl(prov_tin_number, prov_tin_type, enroll_type, vcp_prov_choice_ind, error_flag,
record.partition(), record.offset());


acknowledgement.acknowledge();

}catch (Exception ex) {
System.out.println(record);
System.out.println(ex.getMessage());
}
}

从不同分区获取重复

Kafka对数据一无所知;您将获得所有分区/偏移处的所有记录。

您可以将FilterStrategy的实现添加到侦听器容器工厂中,以筛选出重复项。https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#filtering-消息

最新更新