在MassTransit中发布通用(基于接口)消息和使用具体(具体类)消息



我有这个设计选择问题,不知何故,我很吃力,但徒劳无功。它只适用于特定的场景。

我正在尝试在MassTransit中发布和使用一条消息。例如:(Publisher-一个简单的控制台应用程序(

IShape message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
bus.Publish(message);

(消费者-CircleConsumer和SquareConsumer(

class CircleConsumer : IConsumer<IShape>
{
public Task Consume(ConsumeContext<IShape> context)
{
var circle = context.Message as Circle;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<IShape>
{
public Task Consume(ConsumeContext<IShape> context)
{
var square = context.Message as Square;
return Task.CompletedTask;
}
}

(.Net Core托管服务项目中的消费者配置(

public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddHostedService<Worker>()
.AddScoped<SquareConsumer>()
.AddScoped<CircleConsumer>()
.AddMassTransit(cfg =>
{
cfg.AddBus(ConfigureBus);
cfg.AddConsumer<SquareConsumer>();
cfg.AddConsumer<CircleConsumer>();
})
.AddSingleton<IBus>(provider => provider.GetRequiredService<IBusControl>())
.AddSingleton<IHostedService, TestMTConsumerHostedService>();
IBusControl ConfigureBus(IServiceProvider provider)
{
return Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(hostContext.Configuration["RabbmitMQ:Server:Host"], hostContext.Configuration["RabbmitMQ:Server:VirtualHost"], h =>
{
h.Username(hostContext.Configuration["RabbmitMQ:Auth:Username"]);
h.Password(hostContext.Configuration["RabbmitMQ:Auth:Password"]);
});
cfg.ReceiveEndpoint("CircleQueue", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.Consumer<CircleConsumer>(provider);
});
cfg.ReceiveEndpoint("SquareQueue", ep =>
{
ep.PrefetchCount = 16;
ep.UseMessageRetry(r => r.Interval(2, 100));
ep.Consumer<SquareConsumer>(provider);
});
});
}
});

我的要求是让Publisher在不了解具体类的情况下发布消息。并且只有一个消费者根据消息类型接收消息。

但看起来两个消费者都收到了这个信息,而且选角也不起作用。Desired:假设当发布者发送Square对象时,只有Square消费者应该接收到调用。但是,就我而言,SquareConsumer和CircleConsumer都收到了消息。

作为一种变通方法,这是有效的:

  1. 始终发布具体对象。

    bus.Publish(new Square());
    
  2. 用具体类型声明使用者。

    class CircleConsumer : IConsumer<Circle>
    {
    public Task Consume(ConsumeContext<Circle> context)
    {
    var circle = context.Message;
    return Task.CompletedTask;
    }
    }
    class SquareConsumer : IConsumer<Square>
    {
    public Task Consume(ConsumeContext<Square> context)
    {
    var square = context.Message;
    return Task.CompletedTask;
    }
    }
    

但是,如果我能通用地做,那就太好了。

有什么建议吗?

如果您这样更改代码:

object message = GetShape(/**Business Logic will return some concrete object (Circle or square) based on some business inputs**/);  
bus.Publish(message);

和消费者

class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
// do circle stuff
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
// do square stuff
}
}

它将按预期工作。

在这里我详细介绍一下变化:

  1. Publish与特定类型的实例一起使用意味着使用Publish<T>(T message)重载,该重载使用T作为消息类型。当显式地将消息类型设置为object时,我们调用Publish(object message)重载。在这种情况下,MassTransit将查找消息实现的所有类型
  2. 如果您的目标是使用具体类型的消息,则不需要使用共享接口类型的消息。您只需要为这些特定类型创建消费者。只要您像我在前一点中所描述的那样使用publish,消息就会同时发送到IShapeCircle交换机(例如(

更新:我最终采用了下面的方法。然而,我希望MassTransit能够在本质上路由纯粹多态的消息。这只是一个变通办法,而不是真正的解决方案。仍然欢迎使用新方法。

在具体的课堂上,我几乎没有借助于反思和朋友的方法,就做到了这一点。

出版商:

IShape message = GetShape(text);
var castedMessage = ReflectionHelper.CastToConcreteType(message);
bus.Publish(castedMessage);
public static class ReflectionHelper
{
public static object CastToConcreteType(object obj)
{
MethodInfo castMethod = obj.GetType().GetMethod("Cast").MakeGenericMethod(obj.GetType());
return castMethod.Invoke(null, new object[] { obj });
}
}

消息类型的接口和具体类:

public interface IShape
{
public string Color { get;  }
public string Name { get; }
}
public class Circle : IShape, ITypeCastable
{
public string Color => "Red";
public string Name => $"{Color}Circle";
T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
public static T Cast<T>(object o) => (T)o;
}
public class Square : IShape, ITypeCastable
{
public string Color => "Green";
public string Name => $"{Color}Square";
T ITypeCastable.Cast<T>(object obj) => Cast<T>(obj);
public static T Cast<T>(object o) => (T)o;
}
public interface ITypeCastable
{
T Cast<T>(object obj);
}

使用者:在泛型类型供应中用具体类名替换接口,这是一个非常小的更改。

class CircleConsumer : IConsumer<Circle>
{
public Task Consume(ConsumeContext<Circle> context)
{
var circle = context.Message;
return Task.CompletedTask;
}
}
class SquareConsumer : IConsumer<Square>
{
public Task Consume(ConsumeContext<Square> context)
{
var square = context.Message;
return Task.CompletedTask;
}
}

最新更新