MassTransit with RabbitMq 请求/响应 由于网段的原因,回复地址错误



我有一个在公共交通中使用请求/响应消息的Web应用程序。 这适用于测试环境,没问题。

但是,在客户部署方面,我们面临着一个问题。在客户站点,我们有两个网段 A 和 B。执行数据库调用的组件位于段 A 中,Web 应用程序和 RabbitMq 服务器位于段 B 中。

由于安全限制,段 A 中的组件必须通过具有给定地址的负载均衡器。组件本身可以通过 Masstransit 连接到 RabbitMQ。目前为止,一切都好。

但是,段 B 上的 Web 组件使用 RabbitMq 服务器的直接地址。当 Web 组件现在启动请求/响应调用时,我可以看到消息到达段 A 中的组件。 但是,我看到消费者试图在"错误"的地址上调用RabbitMQ服务器。它使用 Web 组件用于发出请求的地址。但是,段 A 中的组件应回复"负载均衡器"地址。

有没有办法配置或告诉 RespondAsync 调用使用为该组件配置的连接地址?

当然,最简单的方法是让 Web 组件也通过负载均衡器进行连接,但由于网段/安全设置,负载均衡器只能从段 A 访问。

任何输入/帮助不胜感激。

我在 rabbitmq 联邦中遇到了类似的问题。这就是我所做的。

响应地址发送观察者

class ResponseAddressSendObserver : ISendObserver
{
private readonly string _hostUriString;
public ResponseAddressSendObserver(string hostUriString)
{
_hostUriString = hostUriString;
}
public Task PreSend<T>(SendContext<T> context)
where T : class
{
if (context.ResponseAddress != null)
{
// Send relative response address alongside the message
context.Headers.Set("RelativeResponseAddress",
context.ResponseAddress.AbsoluteUri.Substring(_hostUriString.Length));
}
return Task.CompletedTask;
}
...
}

响应地址消耗筛选器

class ResponseAddressConsumeFilter : IFilter<ConsumeContext>
{
private readonly string _hostUriString;
public ResponseAddressConsumeFilter(string hostUriString)
{
_hostUriString = hostUriString;
}
public Task Send(ConsumeContext context, IPipe<ConsumeContext> next)
{
var responseAddressOverride = GetResponseAddress(_hostUriString, context);
return next.Send(new ResponseAddressConsumeContext(responseAddressOverride, context));
}
public void Probe(ProbeContext context){}
private static Uri GetResponseAddress(string host, ConsumeContext context)
{
if (context.ResponseAddress == null)
return context.ResponseAddress;
object relativeResponseAddress;
if (!context.Headers.TryGetHeader("RelativeResponseAddress", out relativeResponseAddress) || !(relativeResponseAddress is string))
throw new InvalidOperationException("Message has ResponseAddress but doen't have RelativeResponseAddress header");
return new Uri(host + relativeResponseAddress);
}
}

响应地址消费上下文

class ResponseAddressConsumeContext : BaseConsumeContext
{
private readonly ConsumeContext _context;
public ResponseAddressConsumeContext(Uri responseAddressOverride, ConsumeContext context)
: base(context.ReceiveContext)
{
_context = context;
ResponseAddress = responseAddressOverride;
}
public override Uri ResponseAddress { get; }
public override bool TryGetMessage<T>(out ConsumeContext<T> consumeContext)
{
ConsumeContext<T> context;
if (_context.TryGetMessage(out context))
{
// the most hackish part in the whole arrangement
consumeContext = new MessageConsumeContext<T>(this, context.Message);
return true;
}
else
{
consumeContext = null;
return false;
}
}
// all other members just delegate to _context 
}

配置总线时

var result = MassTransit.Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(new Uri(hostAddress), h =>
{
h.Username(...);
h.Password(...);
});
cfg.UseFilter(new ResponseAddressConsumeFilter(hostAddress));
...
});
result.ConnectSendObserver(new ResponseAddressSendObserver(hostAddress));

因此,现在相对响应地址与消息一起发送并在接收端使用。

文档不建议使用观察器来修改任何内容,但在这种情况下应该没问题。

也许三个是更好的解决方案,但我还没有找到一个。呵呵

最新更新