ValueSerializer, SerializationException和DLT,如何使其为序列化工作?<



我正在使用Kafka探索Spring Boot,并且我面临DLT及其序列化器的问题。

在我的DLT中,我想要所有导致SerializationException的消息,以及所有导致技术或功能异常的(有效)消息。我的消息是一条XML消息,我可以在反序列化器和JAXB生成的对象中轻松地反序列化它。

问题是值序列化器:

  • 如果我发送愚蠢的消息(不是XML,而是一个随机字符串…),导致SerializationException,我需要ByteArraySerializer。(record.value()Byte[])
  • 如果我有技术或功能异常,我需要一个MyObjectSerializer…(record.value()MyObject)

如何处理这个问题?(目前使用spring-kafka 2.5.5, kafka-client 2.5.1)

我设法使它工作的最好的事情是一个对象序列化器,但我失去了MyObject的字符串格式…

假设您正在使用DeadLetterPublishingRecoverer,请使用这些构造函数之一…

/**
* Create an instance with the provided templates and a default destination resolving
* function that returns a TopicPartition based on the original topic (appended with
* ".DLT") from the failed record, and the same partition as the failed record.
* Therefore the dead-letter topic must have at least as many partitions as the
* original topic. The templates map keys are classes and the value the corresponding
* template to use for objects (producer record values) of that type. A
* {@link java.util.LinkedHashMap} is recommended when there is more than one
* template, to ensure the map is traversed in order. To send records with a null
* value, add a template with the {@link Void} class as a key; otherwise the first
* template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
*/
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates) {

/**
* Create an instance with the provided templates and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic. The templates map keys are
* classes and the value the corresponding template to use for objects (producer
* record values) of that type. A {@link java.util.LinkedHashMap} is recommended when
* there is more than one template, to ensure the map is traversed in order. To send
* records with a null value, add a template with the {@link Void} class as a key;
* otherwise the first template from the map values iterator will be used.
* @param templates the {@link KafkaOperations}s to use for publishing.
* @param destinationResolver the resolving function.
*/
@SuppressWarnings("unchecked")
public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaOperations<? extends Object, ? extends Object>> templates,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {

它们接受一个MapKafkaOperations的键值类型

你可以创建两个生产者工厂,或者直接用这个构造函数重写序列化器…

/**
* Create an instance using the supplied producer factory and properties, with
* autoFlush false. If the configOverrides is not null or empty, a new
* {@link DefaultKafkaProducerFactory} will be created with merged producer properties
* with the overrides being applied after the supplied factory's properties.
* @param producerFactory the producer factory.
* @param configOverrides producer configuration properties to override.
* @since 2.5
*/
public KafkaTemplate(ProducerFactory<K, V> producerFactory, @Nullable Map<String, Object> configOverrides) {

https://docs.spring.io/spring-kafka/docs/current/reference/html/死信

相关内容

最新更新