我有一个简单的消息传递框架,它是围绕"Host"的一个实例构建的,该实例连接到"Client"的多个实例。每个实例通过向其推送消息来进行通信。然后,实例使用单独的任务按照接收消息的顺序处理自己的消息。
以下是代码的简化版本:
interface IMessage
{
}
class Host
{
ConcurrentBag<Client> _Clients = new ConcurrentBag<Client>();
BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
public Host()
{
Task.Factory.StartNew(() =>
{
foreach (var message in _IncomingMessages.GetConsumingEnumerable())
{
ProcessIncomingMessage(message);
}
});
}
public void AddClient(Client client)
{
_Clients.Add(client);
}
private void ProcessIncomingMessage(IMessage message)
{
// consume the message and typically generate new messages
// For now just echo the message back
Broadcast(message);
}
private void Broadcast(IMessage message)
{
foreach (var client in _Clients)
{
client.PushMessage(message);
}
}
}
class Client
{
private Host _Host;
private BlockingCollection<IMessage> _IncomingMessages = new BlockingCollection<IMessage>();
public Client(Host host)
{
Task.Factory.StartNew(() =>
{
foreach (var message in _IncomingMessages.GetConsumingEnumerable())
{
ProcessIncomingMessage(message);
}
});
_Host = host;
_Host.AddClient(this);
}
public void PushMessage(IMessage message)
{
_IncomingMessages.Add(message);
}
private void ProcessIncomingMessage(IMessage message)
{
// interpret the message and update state
}
}
我的要求是,每个处理消息的实例都使用并行执行(如果可能的话),并且每个消息都按照每个类每次接收一个消息的顺序进行处理。
生成仅从GetConsumingEnumerable()中提取的新任务是个坏主意吗?
我觉得我可以使用反应式扩展和IOobservable来简化这一点,但我对使用Rx还相当陌生,不确定它的结构如何满足我的并行执行需求。
任何指导都将不胜感激!
使用Rx(具体地说,IObservable
)作为将消息传递到Host
,然后将消息从Host
传递到所有客户端的中介,当然可以让您的生活轻松很多。对代码的两个更改(您可以执行其中一个或另一个,或两者都执行)是将IObservable<IMessage>
传递给Host
作为其输入,并使其生成IObservable<IMessage>
,类似地,将IObservable<IMessage>
传递给Client
。
据我所见,Host
和Client
不需要了解彼此(这很好)。如果Host
必须主动管理客户端(例如,将它们踢开),这将不起作用——在这些情况下,需要更改下面的代码,让Host
管理客户端订阅,这并不重要,但肯定会涉及客户端向主机请求订阅,而主机持有生成的IDisposable
。
此外,下面的内容一般不会处理错误或取消订阅,添加这些内容不会有太大困难。它在很大程度上是一个骨架,具有与您相同的功能。
class Host
{
private Subject<IMessage> _outbound;
public Host(IObservable<IMessage> messages)
{
_outbound = new Subject<IMessage>();
messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
}
private void ProcessIncomingMessage(IMessage message)
{
_outbound.OnNext(message); // just echo
}
public IObservable<IMessage> Messages { get { return _outbound.AsObservable(); } }
}
class Client
{
public Client(IObservable<IMessage> messages)
{
messages.SubscribeOn(Scheduler.TaskPool).Subscribe(ProcessIncomingMessage);
}
private void ProcessIncomingMessage(IMessage message)
{
// interpret the message and update state
}
}
那么用法很简单:
var source = new Subject<long>();
var host = new Host(source);
var client1 = new Client(host.Messages);
var client2 = new Client(host.Messages);
var client3 = new Client(host.Messages);
source.OnNext(1); // assuming each of these is an IMessage
source.OnNext(2);
source.OnNext(3);
当然,您可以使用任何IObservable
源来驱动Host
。
请注意,如果消息的处理是"长时间运行的",则需要在SubscribeOn
调用中使用Scheduler.NewThread
。否则,每个消息当前都被添加到要处理的Task Pool
中。