使用带有Kafka Binder的spring cloud Hoxton.SR12版本中的spring cloud流。引导版本:2.5.2
问题说明:
-
我想通过将取消序列化错误推到毒丸主题来处理,而不需要重试。
-
通过重试然后推送到parkingLot主题来处理任何其他异常。
-
不要重试ValidationException
这是我迄今为止的错误处理代码:
@Configuration
@Slf4j
public class ErrorHandlingConfig {
@Value("${errorHandling.parkingLotDestination}")
private String parkingLotDestination;
@Value("${errorHandling.retryAttempts}")
private long retryAttempts;
@Value("${errorHandling.retryIntervalMillis}")
private long retryIntervalMillis;
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer parkingLotPublisher) {
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(parkingLotPublisher, new FixedBackOff(retryIntervalMillis, retryAttempts));
seekToCurrentErrorHandler.addNotRetryableExceptions(ValidationException.class);
return seekToCurrentErrorHandler;
}
@Bean
public DeadLetterPublishingRecoverer parkingLotPublisher(KafkaOperations bytesTemplate) {
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer = new DeadLetterPublishingRecoverer(bytesTemplate, (cr, e) -> new TopicPartition(parkingLotDestination, cr.partition()));
deadLetterPublishingRecoverer.setHeadersFunction((cr, e) -> cr.headers());
return deadLetterPublishingRecoverer;
}
}
我认为到目前为止,我所拥有的应该包括被推到停车场的可重试的例外情况。我现在该如何添加代码来将失败的取消序列化事件推送到中毒主题?
由于无法发送到自定义dlqName这一悬而未决的问题,我希望在绑定器/绑定配置之外,在容器级别执行此操作。
我可以使用ErrorHandlingDeserializer并在其上调用setFailedDeserializationFunction()
,该CCD_1将包含一个将消息发送到毒主题的函数。我应该使用源绑定还是原始KafkaOperations来完成此操作?我还需要弄清楚如何将这个ErrorHandingDeserializer挂接到ConsumerFactory中。
为什么在Boot 2.5中使用Hoxton?Boot 2.5.2的正确云版本是2020.0.3
。
SeekToCurrentErrorHandler
已经认为DeserializationException
是致命的。参见
/**
* Add exception types to the default list. By default, the following exceptions will
* not be retried:
* <ul>
* <li>{@link DeserializationException}</li>
* <li>{@link MessageConversionException}</li>
* <li>{@link ConversionException}</li>
* <li>{@link MethodArgumentResolutionException}</li>
* <li>{@link NoSuchMethodException}</li>
* <li>{@link ClassCastException}</li>
* </ul>
* All others will be retried.
* @param exceptionTypes the exception types.
* @since 2.6
* @see #removeNotRetryableException(Class)
* @see #setClassifications(Map, boolean)
*/
@SafeVarargs
@SuppressWarnings("varargs")
public final void addNotRetryableExceptions(Class<? extends Exception>... exceptionTypes) {
ErrorHandlingDeserializer
(没有函数(将异常添加到报头;CCD_ 6自动从报头中提取原始有效载荷并设置为传出记录的CCD_。
由于您使用的是本机编码,因此需要两个KafkaTemplate
,一个用于需要重新序列化的失败记录,另一个用于DeserializationException
(使用ByteArraySerializer
.
参见
/**
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
我还需要弄清楚如何将这个ErrorHandingDeserializer挂接到ConsumerFactory中。
只需设置适当的属性-请参阅文档。