是否有任何方法可以使用EasyNetQ同步使用RabbitMQ的原始字节消息?
我需要保证对来自不以EasyNetQ格式发布的系统的消息进行有序处理和确认。我知道使用者在单个线程上运行,但IAdvancedBus
接口只提供了一种使用原始消息的方法:
IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
Task
返回类型意味着使用者正在异步运行回调,因此可能会无序处理消息。
如果没有,有什么想法可以更改代码来支持它吗?我会制作接口方法:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);
并在RabbitAdvancedBus
中实现它,但我不确定代码的确切位置。
我收到了一个在EasyNetQ Google Group中有效的响应:
要同步执行,您可以这样做:
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// do your synchronous work.....
return Task.CompletedTask;
});
或者添加一个扩展:
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
public static class RabbitAdvancedBusConsumeExtension
{
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
}
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
}
private static Task ExecuteSynchronously(Action action)
{
var tcs = new TaskCompletionSource<object>();
try
{
action();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
return tcs.Task;
}
}
class Program
{
static void Main(string[] args)
{
var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
var queue = bus.Advanced.QueueDeclare();
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// .....
});
}
}
}
更新:此功能在0.52.0.410:版本中添加
https://github.com/EasyNetQ/EasyNetQ/pull/505
这是一个有趣的问题。我自己不是EasyNetQ专家,也许其他人会来给你一个更好的答案然而我已经熟悉EasyNetQ代码库大约一年了,在我看来,在连接消费者时(因此在调用消费者时)很难了解发生了什么。
我首先要指出的是,仅仅通过更改方法的签名,并不能保证消息按顺序处理。例如,看看你建议的接口的这个实现:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
{
onMessage(bytes, properties, info);
return new Task(() => { });
};
Consume(queue, taskWrapper);
}
它调用了原始的Consume
方法,我们真的不知道之后会发生什么,对吧?
如果我站在你的立场上,我会做以下事情之一:
- 使用官方RabbitMq客户端并在那里消费消息表单(这并没有那么棘手!)
- 也许可以看看RawRabbit,这是我一直在贡献的RabbitMq之上的一个薄层(使用vNext标准)。它只支持使用消息的异步签名,但编写
Subscriber.cs
的同步实现(使用类似AsyncEx的同步库)应该不会很困难 - 更改业务逻辑的建模。我不确定这是否适用于您的情况,但一般来说,如果每个消息都以正确的顺序处理是至关重要的,那么您应该以某种方式对其进行建模,以便consume方法可以验证此消息是否是下一条消息。(此外,我不认为EasyNetQ保证消息序列,所以您可能需要为每个新版本的框架验证它)
希望这能有所帮助!