如何在Spring Boot中配置频率(一天一次)来读取Kafka主题中的消息



我正在尝试处理故障,并将消息写入failure-Q,以防由于数据库关闭或任何其他问题导致消息在数据库持久化期间失败。

我们需要每天重新尝试一次来自failure-Q的消息。因为我们正在使用属性文件来配置消费者配置:

# KAFKA
spring.kafka.bootstrap-servers=localhost:9092
## CONSUMER
kafka.topic.consumer.boot: tink_boot.j
kafka.consumer.group-id: tink_boot.j-001
### Kafka from 5000 to 60000
spring.kafka.consumer.auto-commit-interval: 5000
spring.kafka.consumer.client-id: service
### start reading from earliest messages
spring.kafka.consumer.auto-offset-reset: earliest

KafkaListener:

@KafkaListener(topics = "${kafka.topic.consumer.boot}", groupId = "${kafka.consumer.group-id}")
public void receiveEvent(Event e){
log.info("message received: " + e);
}

那么,如何在Spring Boot中每天读取一次基于特定时间的Kafka-Q消息呢?

Apache Kafka是为流式消息服务而构建的,因此我建议您运行一个始终从该队列读取消息并处理消息的使用者
如果由于某种原因您不能立即处理它们,请将它们泄漏到适合日常批量处理的其他数据库中,例如:

  1. 本地文件系统中的每日文件夹
  2. HDFS(Hadoop分布式文件系统(中的每日文件夹
  3. ElasticSearch中的每日滚动索引

您可以使用KafkaConnect,Confluent已经构建了一些连接器,包括我们示例中的ElasticSearch和HDFS。

最新更新