如何在Spring Cloud Stream Kafka Binder中应用死信队列的保留时间配置?



我有一个使用Spring Cloud Stream Kafka的应用程序。对于用户定义的主题,我可以通过给出下面提到的配置来删除指定主题中的记录。但是这个配置不适用于DLQ主题。

例如,在下面的配置中,我将保留时间配置为粘合剂级别。所以我在绑定级别下定义的制作人主题(学生主题)是正确配置的,当主题日志超过指定的保留字节(300000000)时,我可以检查记录是否被删除。

但是粘合剂级别的保留时间对DLQ topic(person-topic-error-dlq)不起作用。对于DLQ主题的清理记录,除了保留时间之外,是否有其他不同的配置?

我该怎么做?

spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binder:
brokers: localhost:19092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/json
group: person-topic-group
student-topic-out:
binder: defaultKafka
destination: student-topic
contentType: application/json

你只是在为生产者绑定设置(默认)属性。

也就是说,这仍然不适合我:

binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
consumer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000

(这些属性甚至不应用于主主题)。

看起来kafka的默认消费者绑定属性有问题。

这对我有用;这些属性应用于主主题和死信主题:

spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000

最新更新