我有一个使用Spring Cloud Stream Kafka的应用程序。对于用户定义的主题,我可以通过给出下面提到的配置来删除指定主题中的记录。但是这个配置不适用于DLQ主题。
例如,在下面的配置中,我将保留时间配置为粘合剂级别。所以我在绑定级别下定义的制作人主题(学生主题)是正确配置的,当主题日志超过指定的保留字节(300000000)时,我可以检查记录是否被删除。
但是粘合剂级别的保留时间对DLQ topic(person-topic-error-dlq)不起作用。对于DLQ主题的清理记录,除了保留时间之外,是否有其他不同的配置?
我该怎么做?
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
binder:
brokers: localhost:19092
bindings:
person-topic-in:
binder: defaultKafka
destination: person-topic
contentType: application/json
group: person-topic-group
student-topic-out:
binder: defaultKafka
destination: student-topic
contentType: application/json
你只是在为生产者绑定设置(默认)属性。
也就是说,这仍然不适合我:
binders:
defaultKafka:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
default:
producer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
consumer:
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000
(这些属性甚至不应用于主主题)。
看起来kafka的默认消费者绑定属性有问题。
这对我有用;这些属性应用于主主题和死信主题:
spring:
cloud:
stream:
kafka:
bindings:
person-topic-in:
consumer:
enableDlq: true
dlqName: person-topic-error-dlq
topic:
properties:
retention.bytes: 300000000
segment.bytes: 300000000