在消息循环中使用Rx而不是BlockingCollection



我有一个简单的消息传递框架,它是围绕"Host"的一个实例构建的,该实例连接到"Client"的多个实例。每个实例通过向其推送消息来进行通信。然后,实例使用单独的任务按照接收消息的顺序处理自己的消息。

以下是代码的简化版本:

interface IMessage
{
}
class Host
{
    ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
    BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
    public Host()
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });
    }
    public void AddClient(Client client)
    {
        _Clients.Add(client);
    }
    private void ProcessIncomingMessage(IMessage message)
    {
        // consume the message and typically generate new messages
        // For now just echo the message back
        Broadcast(message);  
    }
    private void Broadcast(IMessage message)
    {
        foreach (var client in _Clients)
        {
            client.PushMessage(message);
        }
    }
}

class Client
{
    private Host _Host;
    private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
    public Client(Host host)
    {
        Task.Factory.StartNew(() =>
        {
            foreach (var message in _IncomingMessages.GetConsumingEnumerable())
            {
                ProcessIncomingMessage(message);
            }
        });
        _Host = host;
        _Host.AddClient(this);
    }
    public void PushMessage(IMessage message)
    {
        _IncomingMessages.Add(message);
    }
    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

我的要求是,每个处理消息的实例都使用并行执行(如果可能的话),并且每个消息都按照每个类每次接收一个消息的顺序进行处理。

生成仅从GetConsumingEnumerable()中提取的新任务是个坏主意吗?

我觉得我可以使用反应式扩展和IOobservable来简化这一点,但我对使用Rx还相当陌生,不确定它的结构如何满足我的并行执行需求。

任何指导都将不胜感激!

使用Rx(具体地说,IObservable)作为将消息传递到Host,然后将消息从Host传递到所有客户端的中介,当然可以让您的生活轻松很多。对代码的两个更改(您可以执行其中一个或另一个,或两者都执行)是将IObservable<IMessage>传递给Host作为其输入,并使其生成IObservable<IMessage>,类似地,将IObservable<IMessage>传递给Client

据我所见,HostClient不需要了解彼此(这很好)。如果Host必须主动管理客户端(例如,将它们踢开),这将不起作用——在这些情况下,需要更改下面的代码,让Host管理客户端订阅,这并不重要,但肯定会涉及客户端向主机请求订阅,而主机持有生成的IDisposable

此外,下面的内容一般不会处理错误或取消订阅,添加这些内容不会有太大困难。它在很大程度上是一个骨架,具有与您相同的功能。

class Host
{
    private Subject<IMessage> _outbound;
    public Host(IObservable<IMessage> messages)
    {
        _outbound = new Subject<IMessage>();        
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }
    private void ProcessIncomingMessage(IMessage message)
    {
        _outbound.OnNext(message); // just echo
    }
    public IObservable<IMessage> Messages { get { return _outbound.AsObservable(); } }
}
class Client
{
    public Client(IObservable<IMessage> messages)
    {
        messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
    }
    private void ProcessIncomingMessage(IMessage message)
    {
        // interpret the message and update state
    }
}

那么用法很简单:

var source = new Subject<long>();
var host = new Host(source);
var client1 = new Client(host.Messages);
var client2 = new Client(host.Messages);
var client3 = new Client(host.Messages);
source.OnNext(1); // assuming each of these is an IMessage
source.OnNext(2);
source.OnNext(3);

当然,您可以使用任何IObservable源来驱动Host

请注意,如果消息的处理是"长时间运行的",则需要在SubscribeOn调用中使用Scheduler.NewThread。否则,每个消息当前都被添加到要处理的Task Pool中。

相关内容

  • 没有找到相关文章

最新更新