单独处理取消序列化错误和其他异常



使用带有Kafka Binder的spring cloud Hoxton.SR12版本中的spring cloud流。引导版本:2.5.2

问题说明:

  • 我想通过将取消序列化错误推到毒丸主题来处理,而不需要重试。

  • 通过重试然后推送到parkingLot主题来处理任何其他异常。

  • 不要重试ValidationException

这是我迄今为止的错误处理代码:

@Configuration
@Slf4j
public class ErrorHandlingConfig {
@Value("${errorHandling.parkingLotDestination}")
private String parkingLotDestination;
@Value("${errorHandling.retryAttempts}")
private long retryAttempts;
@Value("${errorHandling.retryIntervalMillis}")
private long retryIntervalMillis;
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
seekToCurrentErrorHandler.addNotRetryableExceptions(ValidationException.class); 
return seekToCurrentErrorHandler;
}
@Bean
public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());
return deadLetterPublishingRecoverer;
}
}

我认为到目前为止,我所拥有的应该包括被推到停车场的可重试的例外情况。我现在该如何添加代码来将失败的取消序列化事件推送到中毒主题?

由于无法发送到自定义dlqName这一悬而未决的问题,我希望在绑定器/绑定配置之外,在容器级别执行此操作。

我可以使用ErrorHandlingDeserializer并在其上调用setFailedDeserializationFunction(),该CCD_1将包含一个将消息发送到毒主题的函数。我应该使用源绑定还是原始KafkaOperations来完成此操作?我还需要弄清楚如何将这个ErrorHandingDeserializer挂接到ConsumerFactory中。

为什么在Boot 2.5中使用Hoxton?Boot 2.5.2的正确云版本是2020.0.3

SeekToCurrentErrorHandler已经认为DeserializationException是致命的。参见

/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @since 2.6
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {

ErrorHandlingDeserializer(没有函数(将异常添加到报头;CCD_ 6自动从报头中提取原始有效载荷并设置为传出记录的CCD_。

由于您使用的是本机编码,因此需要两个KafkaTemplate,一个用于需要重新序列化的失败记录,另一个用于DeserializationException(使用ByteArraySerializer.

参见

/**
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

我还需要弄清楚如何将这个ErrorHandingDeserializer挂接到ConsumerFactory中。

只需设置适当的属性-请参阅文档。

相关内容

  • 没有找到相关文章

最新更新