Apache Camel Kafka-汇总KAFKA消息并定期发布到其他主题



我有一个用户:

我需要定期阅读和汇总来自KAFKA主题的消息,然后发布到另一个主题。LocalStorage不是一个选择。这就是我打算解决这个问题的方式,欢迎任何改进的建议

安排Kafka消息的汇总和发布,计划使用Contegregator EIP的完成Interval选项。这是代码。

  @Autowired ObjectMapper objectMapper;
  JacksonDataFormat jacksonDataFormat;
  @PostConstruct
  public void initialize(){
    //objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
    jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
  }

和路线:

public void configure() throws Exception {
    from("kafka:localhost:9092?topic=item-events" +
            "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
            .routeId("kafkapoller")
            .unmarshal(jacksonDataFormat)
            .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
  }

这看起来还不错。要记住的事情:

  • 如果/当JVM在聚合周期中死亡的一半时会发生什么?不在乎,然后冷却,否则您可能想调查PersistentAggregationRepository存储/重播消息,尽管您可以重播您从Kafka丢失的消息(这是我最大的操作问题(
  • 关注,请考虑运行时控制。骆驼令人震惊,因为没有真正告诉您运行时发生什么。诸如聚合器中的失控方法(即非常贪婪的正则表达式(会使您对当前的聚合交换状态几乎没有任何了解,而JMX可能不会告诉您太多有关正在发生的事情的信息。
  • 我将使用AggregateController使您能够外部强制完成交易所的完成,因此您可以执行诸如对骆驼的关闭之类的事情,然后致电以完成机上交换

相关内容

最新更新