如何使用EasyNetQ同步使用RabbitMQ的原始字节消息



是否有任何方法可以使用EasyNetQ同步使用RabbitMQ的原始字节消息?

我需要保证对来自不以EasyNetQ格式发布的系统的消息进行有序处理和确认。我知道使用者在单个线程上运行,但IAdvancedBus接口只提供了一种使用原始消息的方法:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

Task返回类型意味着使用者正在异步运行回调,因此可能会无序处理消息。

如果没有,有什么想法可以更改代码来支持它吗?我会制作接口方法:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

并在RabbitAdvancedBus中实现它,但我不确定代码的确切位置。

我收到了一个在EasyNetQ Google Group中有效的响应:

要同步执行,您可以这样做:

bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
    // do your synchronous work.....
    return Task.CompletedTask;
});

或者添加一个扩展:

using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
    public static class RabbitAdvancedBusConsumeExtension
    {
       public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
    }
    public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
    {
        return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
    }
    private static Task ExecuteSynchronously(Action action)
    {
        var tcs = new TaskCompletionSource<object>();
        try
        {
            action();
            tcs.SetResult(null);
        }
        catch (Exception e)
        {
            tcs.SetException(e);
        }
        return tcs.Task;
    }
}
class Program
{
    static void Main(string[] args)
    {
        var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
        var queue = bus.Advanced.QueueDeclare();
        bus.Advanced.Consume(queue, (bytes, properties, info) =>
        {
            // .....
        });
    }
}
}

更新:此功能在0.52.0.410:版本中添加

https://github.com/EasyNetQ/EasyNetQ/pull/505

这是一个有趣的问题。我自己不是EasyNetQ专家,也许其他人会来给你一个更好的答案然而我已经熟悉EasyNetQ代码库大约一年了,在我看来,在连接消费者时(因此在调用消费者时)很难了解发生了什么。

我首先要指出的是,仅仅通过更改方法的签名,并不能保证消息按顺序处理。例如,看看你建议的接口的这个实现:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

它调用了原始的Consume方法,我们真的不知道之后会发生什么,对吧?

如果我站在你的立场上,我会做以下事情之一:

  1. 使用官方RabbitMq客户端并在那里消费消息表单(这并没有那么棘手!)
  2. 也许可以看看RawRabbit,这是我一直在贡献的RabbitMq之上的一个薄层(使用vNext标准)。它只支持使用消息的异步签名,但编写Subscriber.cs的同步实现(使用类似AsyncEx的同步库)应该不会很困难
  3. 更改业务逻辑的建模。我不确定这是否适用于您的情况,但一般来说,如果每个消息都以正确的顺序处理是至关重要的,那么您应该以某种方式对其进行建模,以便consume方法可以验证此消息是否是下一条消息。(此外,我不认为EasyNetQ保证消息序列,所以您可能需要为每个新版本的框架验证它)

希望这能有所帮助!

最新更新