我有一个用户:
我需要定期阅读和汇总来自KAFKA主题的消息,然后发布到另一个主题。LocalStorage不是一个选择。这就是我打算解决这个问题的方式,欢迎任何改进的建议
安排Kafka消息的汇总和发布,计划使用Contegregator EIP的完成Interval选项。这是代码。
@Autowired ObjectMapper objectMapper;
JacksonDataFormat jacksonDataFormat;
@PostConstruct
public void initialize(){
//objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
}
和路线:
public void configure() throws Exception {
from("kafka:localhost:9092?topic=item-events" +
"&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
.routeId("kafkapoller")
.unmarshal(jacksonDataFormat)
.aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
.marshal().json(JsonLibrary.Jackson)
.to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
}
这看起来还不错。要记住的事情:
- 如果/当JVM在聚合周期中死亡的一半时会发生什么?不在乎,然后冷却,否则您可能想调查
PersistentAggregationRepository
存储/重播消息,尽管您可以重播您从Kafka丢失的消息(这是我最大的操作问题( - 关注,请考虑运行时控制。骆驼令人震惊,因为没有真正告诉您运行时发生什么。诸如聚合器中的失控方法(即非常贪婪的正则表达式(会使您对当前的聚合交换状态几乎没有任何了解,而JMX可能不会告诉您太多有关正在发生的事情的信息。
- 我将使用
AggregateController
使您能够外部强制完成交易所的完成,因此您可以执行诸如对骆驼的关闭之类的事情,然后致电以完成机上交换