我一直在使用ServiceStack MQ Server/Client来增强我的平台中基于消息的体系结构,它一直完美地工作着。我现在正在尝试做一些我认为不受SS消息生产者/消费者支持的事情。
本质上,我在一个集中的数据中心发射消息(事件),我在一个不可靠的网络上有大约2000个分散的节点遍布美国,这些节点需要潜在地知道这个事件,但是事件只需要针对大约2000个节点中的一个。我需要具有Pub/Sub的任意命名通道的灵活性,但需要MQ的持久性。我从Pub/Sub开始,但是网络太不可靠了,所以我把解决方案改为使用RedisMQServer。我有它的工作,但想确保我没有错过的东西在界面。我很好奇SS的创建者是否考虑过这个用例,如果是的话,讨论的结果是什么?这确实与使用POCO来驱动消息消费的结果/操作的概念相抵触。也许这就是原因?
这是我的制作人
public ExpressLightServiceResponse Get(ExpressLightServiceRequest query)
{
var result = new ExpressLightServiceResponse();
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
using (var messageProducer = _messageService.CreateMessageProducer())
{
var message = MessageFactory.Create(newType.CreateInstance());
messageProducer.Publish(message);
}
return result;
}
这是我的消费者
public class ServerAppHost : AppHostHttpListenerBase
{
private readonly string _store;
public string StoreQueue => $"EventA{_store}";
public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly)
{
_store = store;
}
public override void Configure(Container container)
{
container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString));
var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public);
typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
var newType = typeBuilder.CreateType();
var mi = typeof(Temp).GetMethod("Foo");
var fooRef = mi.MakeGenericMethod(newType);
fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null);
}
}
public class Temp
{
private readonly IRedisClientsManager _redisClientsManager;
public Temp(IRedisClientsManager redisClientsManager)
{
_redisClientsManager = redisClientsManager;
}
public void Foo<T>()
{
var mqService = new RedisMqServer(_redisClientsManager);
mqService.RegisterHandler<T>(DoWork);
mqService.Start();
}
private object DoWork<T>(IMessage<T> arg)
{
//Do work
return null;
}
}
这给了我Pub/Sub的灵活性和Queue的持久性。有没有人看到/知道一个更"原生"的方式来实现这一点?
应该只有1 MQ主机在您的AppHost中注册,所以我首先将其从包装器类中删除,并让它只是注册处理程序,例如:
public override void Configure(Container container)
{
//...
container.Register<IMessageService>(
c => new RedisMqServer(c.Resolve<IRedisClientsManager>());
var mqServer = container.Resolve<IMessageService>();
fooRef.Invoke(new Temp(mqServer), null);
mqServer.Start();
}
public class Temp
{
private readonly IMessageService mqServer;
public Temp(IMessageService mqServer)
{
this.mqServer = mqServer;
}
public void Foo<T>() => mqService.RegisterHandler<T>(DoWork);
}
但是这种方法并不适合ServiceStack, ServiceStack鼓励使用代码优先的消息,它定义了客户端/服务器用来处理发送和接收的消息的服务契约。因此,如果你想使用ServiceStack发送自定义消息,我建议每个消息都有一个单独的类,或者有一个通用的类型,如SendEvent
,其中消息或事件类型是类的属性。
否则,如果你想继续使用自定义消息不使用RedisMqServer
,你可以使用专用的MQ,如Rabbit MQ,或者如果你喜欢直接使用Redis List -这是所有Redis MQ使用的数据结构。