我一直在使用AMQP之间的两个微型服务之间建立异步消息传递。我们想促进每种服务的单独域对象的使用,这意味着每个服务都必须定义自己在队列中传递的任何对象的副本。
我们在生产者和消费者端都使用Jackson2JsonMessageConverter
,我们正在使用Java DSL将流量连接到/from the列队。
我确定有一种方法可以做到这一点,但是它逃脱了我:我希望消费者一面忽略从生产者中传递的__TypeID__
标头,因为消费者可能对该事件有不同的表示(并且它可能会包含在其他Java软件包中)。
看来正在完成工作,以便如果使用注释@RabbitListener
,则会派生inferredArgumentType
参数并覆盖标头信息。这正是我想做的,但是我想使用Java DSL来做到这一点。我尚未找到一种干净的方法来做到这一点,也许我只是错过了一些明显的东西。在使用以下DSL时,得出类型似乎很简单:
return IntegrationFlows
.from(
Amqp.inboundAdapter(factory, queueRemoteTaskStatus())
.concurrentConsumers(10)
.errorHandler(errorHandler)
.messageConverter(messageConverter)
)
.channel(channelRemoteTaskStatusIn())
.handle(listener, "handleRemoteTaskStatus")
.get();
但是,这会导致ClassNotFound
异常。到目前为止,我发现解决此问题的唯一方法是设置一个自定义消息转换器,这需要对类型的明确定义。
public class ForcedTypeJsonMessageConverter extends Jackson2JsonMessageConverter {
ForcedTypeJsonMessageConverter(final Class<?> forcedType) {
setClassMapper(new ClassMapper() {
@Override
public void fromClass(Class<?> clazz, MessageProperties properties) {
//this class is only used for inbound marshalling.
}
@Override
public Class<?> toClass(MessageProperties properties) {
return forcedType;
}
});
}
}
我真的希望这是派生的,因此开发人员不必真正处理此问题。
有一个更简单的方法吗?
最简单的方法是用TypeIdMapping
(setIdClassMapping()
)配置Jackson Converter的DefaultJackson2JavaTypeMapper
。
在发送系统上,映射foo:com.one.Foo
和接收系统映射foo:com.two.Foo
。
然后,__TypeId__
标头获取foo
,并且接收系统将其映射到Foo
的表示。
编辑
另一个选项是将afterReceiveMessagePostProcessor
添加到入站通道适配器的侦听器容器中 - 它可以更改__TypeId__
标头。