SpringBoot RabbitMQ-如何减少许多主题(事件)的样板



我想知道在SpringBoot中初始化许多RabbitMQ队列/绑定时,是否有一种方法可以减少样板代码的数量?

按照事件驱动的方法,我的应用程序会产生大约50种类型的事件(稍后它将被拆分为几个较小的应用程序,但仍然如此)。每个事件都以"主题"类型进行交换。一些事件被其他应用程序消耗,一些事件被发送它们的同一应用程序额外消耗。

让我们考虑一下发布和自我消费的情况。

的SpringBoot中,每个事件我都需要声明:

  1. 配置中的路由密钥名称(如"event.item.puyed")
  2. 在同一应用程序中使用该事件的队列名称("队列.事件.项目.已购买")
  3. 匹配的配置属性类字段或保持属性名称的代码中的变量itemPurchasedRoutingKey或常量(如${event.item.puyed})
  4. 用于创建队列的bean(具有以事件名称为特征的名称)itemPurchasedQueue
  5. 用于绑定创建的bean(具有事件名称)和路由密钥名称。类似itemPurchasedBinding使用itemPurchasedQueue.bind(…itemPurchasedRoutingKey)构造
  6. 事件的RabbitListener,带有包含队列名称的注释(无法在运行时定义)

So-6个地方"购买的物品"以一种或另一种形式被提及。样板代码的数量简直要了我的命:)如果有50个事件,很容易出错——在添加新事件时,需要记住将其添加到6个位置。

理想情况下,对于每个活动,我都想:

  1. 在配置中指定路由密钥。队列名称可以通过添加通用前缀(特定于应用程序)来构建
  2. 使用一些注释或替代RabbitListener,它会自动声明队列(通过路由关键字+前缀),绑定到它,并侦听事件

有办法优化它吗?我考虑过自定义注释,但RabbitListener不喜欢动态队列名称,如果我在一些util方法中声明队列和绑定,spring-boot就找不到它们的bean。也许有一种方法可以在代码中声明所有这些东西,但我相信这不是Spring的方法:)

所以我最终使用了手动bean声明,并为每个bean 使用了1 bind()方法

@Configuration
@EnableConfigurationProperties(RabbitProperties::class)
class RabbitConfiguration(
private val properties: RabbitProperties,
private val connectionFactory: ConnectionFactory
) {
@Bean
fun admin() = RabbitAdmin(connectionFactory)
@Bean
fun exchange() = TopicExchange(properties.template.exchange)
@Bean
fun rabbitMessageConverter() = Jackson2JsonMessageConverter(
jacksonObjectMapper()
.registerModule(JavaTimeModule())
.registerModule(Jdk8Module())
.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
.enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL)
)
@Value("${okko.rabbit.queue-prefix}")
lateinit var queuePrefix: String
fun <T> bind(routingKey: String, listener: (T) -> Mono<Void>): SimpleMessageListenerContainer {
val queueName = "$queuePrefix.$routingKey"
val queue = Queue(queueName)
admin().declareQueue(queue)
admin().declareBinding(BindingBuilder.bind(queue).to(exchange()).with(routingKey)!!)
val container = SimpleMessageListenerContainer(connectionFactory)
container.addQueueNames(queueName)
container.setMessageListener(MessageListenerAdapter(MessageHandler(listener), rabbitMessageConverter()))
return container
}
internal class MessageHandler<T>(private val listener: (T) -> Mono<Void>) {
// NOTE: don't change name of this method, rabbit needs it
fun handleMessage(message: T) {
listener.invoke(message).subscribeOn(Schedulers.elastic()).subscribe()
}
}
}

@Service
@Configuration
class EventConsumerRabbit(
private val config: RabbitConfiguration,
private val routingKeys: RabbitEventRoutingKeyConfig
) {
@Bean
fun event1() = handle(routingKeys.event1)
@Bean
fun event2() = handle(routingKeys.event2)
...
private fun<T> handle(routingKey: String): Mono<Void> = config.bind<T>(routingKey) {
log.debug("consume rabbit event: $it")
... // handle event, return Mono<Void>
}
companion object {
private val log by logger()
}
}
@Configuration
@ConfigurationProperties("my.rabbit.routing-key.event")
class RabbitEventRoutingKeyConfig {
lateinit var event1: String
lateinit var event2: String
...
}

最新更新