通过弹簧集成 DSL 提高消息处理的性能



我正在使用 Spring Integration dsl 接收来自 Kafka 的消息,然后在 Oracle、Couchbase 中解析、丰富和持久化,并将其发布到下线通道的另一个 Kafka 主题。

">

保留"和"发布"需要位于事务中,以便所有数据源/数据存储同步。如果 Couchbase、Oracle 或 Publish Kafka 主题不可用,则回滚事务。

同时,我不希望在处理消息时出现延迟,因为这是对业务用户的实时更新。

return IntegrationFlows
// .from(Jms.messageDrivenChannelAdapter(this.acarsMqListener)) //Get Message
// from MQ
.from(org.springframework.integration.jms.dsl.Jms
.messageDrivenChannelAdapter(org.springframework.integration.jms.dsl.Jms
.container(this.acarsMqConnectionFactory, this.acarsQueue)
.transactionManager(transactionManager(this.acarsMqConnectionFactory)).get()))
.wireTap(ACARS_WIRE_TAP_CHNL) // Log the raw messaged
.transform(agmTransformer, "parseXMLMessage") // Parse the AGM xml message
.handle(acarsProcessor, "pushAcarsRawData") // push raw acars data
.wireTap(ACARS_WIRE_TAP_CHNL_DYNAMODB) // Log the raw messaged
.transform(agmTransformer, "populateSmi") // Populate SMI
// .transform(agmTransformer, "populateSmi") //Populate SMI
.filter(acarsFilter, "filterMessageOnSmi") // Filter on SMI
.transform(agmTransformer, "populateImi") // Populate IMI
.filter(acarsFilter, "filterMessageOnSmiImi") // Filter on IMI
.transform(acarsProcessor, "processEvent") // Parse
.publishSubscribeChannel(
pubSub -> pubSub
.subscribe(flow -> flow.bridge(e -> e.order(Ordered.HIGHEST_PRECEDENCE))
.enrichHeaders(
h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY, "payload.flightNbr")) // Add flight number as key
.transform("payload.message") // publish the transformed message
.handle(Kafka.outboundChannelAdapter(kafkaTemplate).topic(acarsKafkaTopic))) // publish to kafka
.subscribe(flow -> flow.channel(UPDATE_DATA_STORE_CHNL))) // send to a different channel to update couchbase
.get();

您能否建议在集成流程中可以做些什么来提高处理性能。

您应该分析您的应用程序以查看瓶颈所在。

通常,若要提高吞吐量,需要增加侦听器容器中的concurrency以并行处理消息。使用 Kafka,您至少需要与并发线程一样多的分区。

如果瓶颈在某个下游组件中,则增加并发性可能无济于事;因此需要进行性能分析。

相关内容

  • 没有找到相关文章

最新更新