我将从我们的ASP.NET网站上删除我们的电子邮件系统,该系统用于立即发送电子邮件,并在单独的服务中处理请求,以减少网站上的工作量。我试图围绕一组接口来设计它,这样如果我愿意的话,我就可以交换实现,但最初它将基于消息队列(MSMQ)来向队列发送请求,让服务接收传入请求,然后处理它们。我目前大致定义了以下接口:
// Sends one or more requests to be processed somehow
public interface IRequestSender
{
void Send(IEnumerable<Request> requests);
}
// Listens for incoming requests and passes them to an observer to do the real work
public interface IRequestListener : IObservable<Request>
{
void Start();
void Stop();
}
// Processes a request given to it by a IRequestListener
public interface IRequestProcessor : IObserver<Request>
{
}
您会注意到Listener和Processor使用了可观察的模式,因为我认为这似乎最适合。
我的问题是弄清楚如何编写从MSMQ接收的IRequestListener
的实现,基本上是如何创建合适的IObservable<T>
?
我发现的第一个选项是根据MSDN文档中给出的示例从头开始创建IObservable<T>
,但这似乎需要做很多管道工作。
另一种选择是使用反应式扩展,因为它似乎是为了更容易创建可观察性。我发现最接近将Rx与MSMQ一起使用的是以下页面:
- 使用异步模式(queue.BeginReceive、queue.EndReceive)为MSMQ消息接收使用响应扩展(Rx)
- MSMQ使用Rx-MSMQ接收超时问题的代码段
但我不确定如何将这些示例应用于我的IRequestListener
接口。
任何其他想法也欢迎,如果合适的话,甚至可以修改我的基本设计。
我最初确实使用了FromAsyncPattern,但后来为它编写了一个类,因为它能更好地处理超时和中毒消息。一旦启动,队列无论如何都是热门的可观察对象。也可以使用Observable.Defer
使其更接近Rx,而不是"开始/停止"。
以下是QueueObservable的基本实现。您可以先拨打ListenReceive
。
Subject<T> Subject = new Subject<T>();
protected void ListenReceive()
{
Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive);
}
protected void OnReceive(IAsyncResult ar)
{
Message message = null;
try
{
message = Queue.EndReceive(ar);
}
catch (TimeoutException ex)
{
//retry?
}
if (message != null)
Subject.OnNext((T) message.Body);
Thread.Yield();
if (!IsDisposed)
ListenReceive();
}
public IObservable<T> AsObservable()
{
return Subject;
}