RABBITMQ-对话vs corseLationId-更适合跟踪特定请求



RabbitMQ似乎具有两个非常相似的属性,我并不完全理解差异。ConversationIdCorrelationId

我的用例如下。我有一个生成Guid的网站。该网站称为API,并将该唯一标识符添加到HttpRequest标题中。反过来,这将发布给RabbitMQ的消息。该消息是由第一个消费者处理的,并将其他地方传递给另一个消费者,依此类推。

为了记录目的,我想记录一个标识符,该标识符将初始请求与所有后续操作联系起来。这对于整个应用程序的不同部分的旅程应该是独一无二的。因此。当登录到Serilog/Elasticsearch之类的内容时,这将变得容易查看哪个请求触发了初始请求,并且在整个应用程序中,该请求的所有日志条目都可以合并在一起。

我创建了一个提供商,该提供商查看了标识符的传入HttpRequest。我称其为"相关性",但是我开始怀疑这是否应该被称为"对话"。就RabbitMQ而言,"对话"的想法更适合该模型,或者"相关性"更好?

这两个概念有什么区别?

在代码方面,我要做以下操作。首先在我的API中注册总线,并配置SendPublish以使用提供商的CorrelationId

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

供参考,这是我的简单提供商接口

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

和ASPNETCORE实现,该实现提取了调用客户端设置的唯一ID(即网站)。

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;
    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }
    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }
        return Guid.NewGuid();
    }
}

最后,我的服务主机是简单的Windows Service应用程序,可以静置并消费已发布的消息。他们使用以下内容来抓住corserLationId,并很可能会在其他服务主机中发布给其他消费者。

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;
    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }
    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }
        return Guid.NewGuid();
    }
}

我在我的消费者中有一个记录仪,它使用该提供商提取CorrelationId

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");
    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }
    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

阅读文档,它说了以下关于"对话"的信息:

对话是由发送或 已发布,没有现有上下文可用(例如 消息是通过使用ibus.send或ibus.publish发送或发布的。如果 现有上下文用于发送或发布消息, 对话ID被复制到新信息中,确保一组 同一对话中的消息具有相同的标识符。

现在,我开始认为我的术语混杂在一起,从技术上讲,这是一次对话(尽管"对话"就像"电话游戏")。

那么,在此用例中CorrelationId还是ConversationId?请帮助我正确术语!

在消息对话中(提示预告型乐谱),可以有一个消息(我告诉您要做一些事情,或者我告诉每个正在听某事发生的人)或多个消息(我告诉你要做点什么,你告诉别人,或者我告诉所有正在听的人,这些听众告诉他们的朋友,等等)。

使用MassTransit,从第一条消息到最终消息,正确使用,这些消息中的每个消息都具有相同的ConversationId。MassTransit将属性从ConsumeContext(未修改)复制到消耗消息期间的每个传出消息。这使得所有内容的一部分 trace - 对话。

但是,MassTransit默认设置了相关性ID。如果消息属性命名为correlationId(或CommandID或EventID),则可以自动设置它,也可以添加自己的名称。

如果在消耗的消息上存在相关性ID,则任何传出消息都将具有复制到initiatorID属性的相关性属性(原因和效果 - 启动的消耗消息启动了后续消息的创建)。这形成了一个可以遵循的链(或跨度术语中的跨度),以显示最初消息中消息的传播。

应将相关性视为命令或事件的标识符,以便在整个系统日志中可以看到该命令的效果。

在我看来,您来自HTTP的输入可能是启动器,因此将标识符复制到initiatorID中并为消息创建一个新的相关性,或者您可能只想将相同的标识符用于初始相关性并让您使用相同的标识符随后的消息将其用作发起者。

最新更新