使用Kafka DLT处理毒丸的春云流


  • 弹簧套2.5.2
  • 霍克斯顿春云。SR12
  • 弹簧kafka 2.6.7(由于问题而降级:https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1079)

我按照这个配方来处理取消序列化错误:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq-kafka.adoc

我创建了上面食谱中提到的豆子:

Configuration
@Slf4j
public class ErrorHandlingConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setErrorHandler(errorHandler);
};
}
@Bean
public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
}

配置文件:

spring:
cloud:
stream:
default:
producer:
useNativeEncoding: true
consumer:
useNativeDecoding: true
bindings:
myInboundRoute:
destination: some-destination.1
group: a-custom-group
myOutboundRoute:
destination: some-destination.2
kafka:
binder:
brokers: localhost
defaultBrokerPort: 9092
configuration:
application:
security: PLAINTEXT
bindings:
myInboundRoute:
consumer:
autoCommitOffset: true
startOffset: latest
enableDlq: true
dlqName: my-dql.poison
dlqProducerProperties:
configuration:
value.serializer: myapp.serde.MyCustomSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
myOutboundRoute:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: myapp.serde.MyCustomSerializer

我希望DLT被称为my-dql.poison。事实上,这个主题创建得很好,但我也得到了第二个自动创建的主题,称为some-destination.1.DLT为什么它会创建这个以及我在配置中用dlqName命名的那个?

我做错了什么?当我轮询消息时,消息在自动创建的some-destination.1.DLT中,而不是我的dlqName 中

  1. 如果在容器中配置STCEH,则不应在绑定中配置dlt处理。同时将maxAttempts=1设置为禁用重试。

  2. 您需要在DLPR中配置一个目标解析程序以使用不同的名称。

/**
* Create an instance with the provided template and destination resolving function,
* that receives the failed consumer record and the exception and returns a
* {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
* 0, no partition is set when publishing to the topic.
* @param template the {@link KafkaOperations} to use for publishing.
* @param destinationResolver the resolving function.
*/
public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
this(Collections.singletonMap(Object.class, template), destinationResolver);
}

请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-字母

使用绑定的DLT名称配置DLPR存在一个悬而未决的问题。

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031

相关内容

  • 没有找到相关文章

最新更新