有没有一种方法可以在春季kafka 2.8中为earlyRecordInterceptor调用自定义的ErrorHand



在spring-kafka 2.5.x,2.6.x(非事务性情况(中,如果我添加一个自定义的RecordInterceptor,这个RecordInterceptor将被方法doInvokeRecodListener(record, iterator)调用(因为没有earlyRecordInterceptor(,在方法内部,它是invokedErrorHandler,它可以让我在这里执行一些自定义的错误汉,spring-ka夫ka 2.6.x源代码。(我在下面列出了spring-kafka的关键源代码(

private RuntimeException doInvokeRecordListener(final ConsumerRecord<K, V> record,
Iterator<ConsumerRecord<K, V>> iterator) {
Object sample = startMicrometerSample();
try {
invokeOnMessage(record);
successTimer(sample);
recordInterceptAfter(record, null);
}
catch (RuntimeException e) {
...
recordInterceptAfter(record, e);
if (this.errorHandler == null) {
throw e;
}
try {
invokeErrorHandler(record, iterator, e); // here can invoke my customized Errorhandler
....
return null;
}
但在spring kafka 2.8中,spring kaf卡将interceptBeforTx设置为true(根据此处(,然后我的cusomInterceptor的调用过程将被放入earlyRecordInterceptor。但不幸的是,earlyRecordInterceptor的过程没有调用任何ErrorHandler,那么我就无法调用我的自定义ErrorHandler,spring-kafka2.8.x源代码,请参阅此处。

private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
ConsumerRecord<K, V> next = nextArg;
if (this.earlyRecordInterceptor != null) {
next = this.earlyRecordInterceptor.intercept(next, this.consumer);
if (next == null) {
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
+ ListenerUtils.recordToString(nextArg));
}
}
return next;
}
我的问题是为什么spring-kafka不为earlyRecordInterceptor调用errorhandle?如果可以增强?(也可能像doInvokeRecordListener那样在earlyRecordInterceptor方法的过程中调用错误处理程序?(

是否有方法调用earlyRecordInterceptor的错误处理程序?

感谢您的帮助

earlyRecordInterceptor仅用于调用;在";方法(intercept(。

它只是对与从recordInterceptAfter调用的commonRecordInterceptor相同的拦截器的引用。

当不使用事务时,从不调用早期拦截器,但要恢复到事务的以前行为,请将interceptBeforTx设置为false。

private final RecordInterceptor<K, V> earlyRecordInterceptor =
isInterceptBeforeTx() || this.kafkaTxManager == null
? getRecordInterceptor()
: null;
private final RecordInterceptor<K, V> commonRecordInterceptor = getRecordInterceptor();

最新更新