使用AfterRollbackProcessor恢复非重试异常的Kafka消息



关于链接-Kafka事务回滚不适用于RecordTooLargeException 的3个主题

EDIT3上,我有以下问题

如何在使用AfterRollbackProcessor 的同时将错误发送到DB并发送到DLQ

我在DefaultAfterRollbackProcessor 中添加了addNotRetryableException(RecordTooLargeException、IllegalArgumentException、CustomBusinessException(

恢复阶段后(将错误保存到数据库并发送到DLQ(-如果代码发生重新平衡或重新启动-代码再次重试失败记录(RecordTooLargeException(-如何跳过NotRetryableException错误以进一步尝试

@Bean
AfterRollbackProcessor<Object, Object> arp() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
// If the exception is RetryableExceptions then tell kafka do not send that message if code restarted
}, new FixedBackOff(3000L, 2));
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}

根据建议-我更新了代码如下


@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private DBHandler dbHandler;
@Bean
AfterRollbackProcessor<Object, Object> arp() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>((rec, ex) -> {
log.error("#### Failed to process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex);
// Save records to DB
dbHandler.handleFailure((String)rec.key(), (String)rec.value(), ex, rec.topic());

// want to send Data to DLQ - How to do 

}, new FixedBackOff(3000L, 3),kafkaTemplate, true);
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}

一些如何找到解决方案解决方案

//在恢复阶段中将错误记录转储到DB的类下创建

@Slf4j
@Service
public class DBPublishingRecordRecoverer implements ConsumerRecordRecoverer {
@Override
public void accept(ConsumerRecord<?, ?> rec, Exception ex) {
log.error("@ DB Operation |  process {} from topic, partition {}-{}, @{}",
rec.value(), rec.topic(), rec.partition(), rec.offset(), ex.getMessage());
}
}

创建了一个类,该类将相同的失败记录发送到DLT


@Slf4j
@Service
public class DLTRecordRecoverer  {
public DeadLetterPublishingRecoverer dlr(@Nullable KafkaOperations<?, ?> kafkaOperations) {
return new DeadLetterPublishingRecoverer(kafkaOperations) {
@Override
public void accept(ConsumerRecord<?, ?> record, Exception exception) {
log.info("DLQ to process {} from topic, partition {}-{}, @{}",
record.value(), record.topic(), record.partition(), record.offset(), exception.getMessage());
super.accept(record, exception);
}
};
}
}

现在将这2个恢复器添加到AfterRollbackProcessor


@Bean
AfterRollbackProcessor<Object, Object> xyz() {
DefaultAfterRollbackProcessor darp = new DefaultAfterRollbackProcessor<>(testRecoverer
.andThen(dltRecordRecoverer.dlr(kafkaTemplate)),
new FixedBackOff(3000L, 3), kafkaTemplate, true);
Class<? extends Exception>[] nre = new Class[2];
nre[0] = RecordTooLargeException.class;
nre[1] = IllegalArgumentException.class;
darp.addNotRetryableExceptions(nre);
return darp;
}

日志输出

c.t.t.demo.DBPublishingRecordRecoverer   : @ DB Operation |  process Another example from topic, partition TEST-TOPIC-2, @20
c.t.transaction.demo.DLTRecordRecoverer  : DLQ to process Another example from topic, partition TEST-TOPIC-2, @20
o.a.k.c.p.internals.TransactionManager   : [Producer clientId=raw-item-producer-client-1, transactionalId=tx-01d1a934-3c0e-45b4-ac1f-5b8fa

在消费者代码中

KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [PRICE-TOPIC-0, PRICE-TOPIC-1, PRICE-TOPIC-2]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [ITEM-TOPIC-1, ITEM-TOPIC-2, ITEM-TOPIC-0]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [INVENTORY-TOPIC-1, INVENTORY-TOPIC-0, INVENTORY-TOPIC-2]
KafkaMessageListenerContainer    : aem-dam-edm-group-id: partitions assigned: [TEST-TOPIC.DLT-1, TEST-TOPIC.DLT-0, TEST-TOPIC.DLT-2]
ransaction.demo.ConsumerService  : Received payload. Topic : TEST-TOPIC.DLT , key :TestKey-002 , value : Another example

为了提交已恢复事务的偏移量,必须将事务KafkaTemplate传递到DefaultAfterRollbackProcessor,并将commitRecovered设置为true。请参阅javadocs

/**
* Construct an instance with the provided recoverer which will be called after the
* backOff returns STOP for a topic/partition/offset.
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
* @param backOff the {@link BackOff}.
* @param kafkaOperations for sending the recovered offset to the transaction.
* @param commitRecovered true to commit the recovered record's offset; requires a
* {@link KafkaOperations}.
* @since 2.5.3
*/

最新更新