ServiceStack消息过滤



我一直在使用ServiceStack MQ Server/Client来增强我的平台中基于消息的体系结构,它一直完美地工作着。我现在正在尝试做一些我认为不受SS消息生产者/消费者支持的事情。

本质上,我在一个集中的数据中心发射消息(事件),我在一个不可靠的网络上有大约2000个分散的节点遍布美国,这些节点需要潜在地知道这个事件,但是事件只需要针对大约2000个节点中的一个。我需要具有Pub/Sub的任意命名通道的灵活性,但需要MQ的持久性。我从Pub/Sub开始,但是网络太不可靠了,所以我把解决方案改为使用RedisMQServer。我有它的工作,但想确保我没有错过的东西在界面。我很好奇SS的创建者是否考虑过这个用例,如果是的话,讨论的结果是什么?这确实与使用POCO来驱动消息消费的结果/操作的概念相抵触。也许这就是原因?

这是我的制作人

    public ExpressLightServiceResponse Get(ExpressLightServiceRequest query)
    {
        var result = new ExpressLightServiceResponse();
        var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
        var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
        var typeBuilder = moduleBuilder.DefineType(string.Format("EventA{0}", query.Store), TypeAttributes.Public);
        typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
        var newType = typeBuilder.CreateType();
        using (var messageProducer = _messageService.CreateMessageProducer())
        {
            var message = MessageFactory.Create(newType.CreateInstance());
            messageProducer.Publish(message);
        }
        return result;
    }

这是我的消费者

public class ServerAppHost : AppHostHttpListenerBase
{
    private readonly string _store;
    public string StoreQueue => $"EventA{_store}";
    public ServerAppHost(string store) : base("Express Light Server", typeof(PubSubServiceStatsService).Assembly)
    {
        _store = store;
    }
    public override void Configure(Container container)
    {
        container.Register<IRedisClientsManager>(new PooledRedisClientManager(ConfigurationManager.ConnectionStrings["Redis"].ConnectionString));
        var assemblyBuilder = Thread.GetDomain().DefineDynamicAssembly(new AssemblyName("ArbitaryNamespace"), AssemblyBuilderAccess.Run);
        var moduleBuilder = assemblyBuilder.DefineDynamicModule("ModuleName");
        var typeBuilder = moduleBuilder.DefineType(StoreQueue, TypeAttributes.Public);
        typeBuilder.DefineDefaultConstructor(MethodAttributes.Public);
        var newType = typeBuilder.CreateType();
        var mi = typeof(Temp).GetMethod("Foo");
        var fooRef = mi.MakeGenericMethod(newType);
        fooRef.Invoke(new Temp(container.Resolve<IRedisClientsManager>()), null);
    }
}
public class Temp
{
    private readonly IRedisClientsManager _redisClientsManager;
    public Temp(IRedisClientsManager redisClientsManager)
    {
        _redisClientsManager = redisClientsManager;
    }
    public void Foo<T>()
    {
        var mqService = new RedisMqServer(_redisClientsManager);
        mqService.RegisterHandler<T>(DoWork);
        mqService.Start();
    }
    private object DoWork<T>(IMessage<T> arg)
    {
        //Do work
        return null;
    }
}

这给了我Pub/Sub的灵活性和Queue的持久性。有没有人看到/知道一个更"原生"的方式来实现这一点?

应该只有1 MQ主机在您的AppHost中注册,所以我首先将其从包装器类中删除,并让它只是注册处理程序,例如:

public override void Configure(Container container)
{
    //...
    container.Register<IMessageService>(
        c => new RedisMqServer(c.Resolve<IRedisClientsManager>());
    var mqServer = container.Resolve<IMessageService>();
    fooRef.Invoke(new Temp(mqServer), null);
    mqServer.Start();
}
public class Temp
{
    private readonly IMessageService mqServer;
    public Temp(IMessageService mqServer)
    {
        this.mqServer = mqServer;
    }
    public void Foo<T>() => mqService.RegisterHandler<T>(DoWork);
}

但是这种方法并不适合ServiceStack, ServiceStack鼓励使用代码优先的消息,它定义了客户端/服务器用来处理发送和接收的消息的服务契约。因此,如果你想使用ServiceStack发送自定义消息,我建议每个消息都有一个单独的类,或者有一个通用的类型,如SendEvent,其中消息或事件类型是类的属性。

否则,如果你想继续使用自定义消息不使用RedisMqServer,你可以使用专用的MQ,如Rabbit MQ,或者如果你喜欢直接使用Redis List -这是所有Redis MQ使用的数据结构。

最新更新