如何创建<T>从 MSMQ 消息队列读取的 IObservable?



我将从我们的ASP.NET网站上删除我们的电子邮件系统,该系统用于立即发送电子邮件,并在单独的服务中处理请求,以减少网站上的工作量。我试图围绕一组接口来设计它,这样如果我愿意的话,我就可以交换实现,但最初它将基于消息队列(MSMQ)来向队列发送请求,让服务接收传入请求,然后处理它们。我目前大致定义了以下接口:

// Sends one or more requests to be processed somehow
public interface IRequestSender
{
    void Send(IEnumerable<Request> requests);
}
// Listens for incoming requests and passes them to an observer to do the real work
public interface IRequestListener : IObservable<Request>
{
    void Start();
    void Stop();
}
// Processes a request given to it by a IRequestListener
public interface IRequestProcessor : IObserver<Request>
{
}

您会注意到Listener和Processor使用了可观察的模式,因为我认为这似乎最适合。

我的问题是弄清楚如何编写从MSMQ接收的IRequestListener的实现,基本上是如何创建合适的IObservable<T>

我发现的第一个选项是根据MSDN文档中给出的示例从头开始创建IObservable<T>,但这似乎需要做很多管道工作。

另一种选择是使用反应式扩展,因为它似乎是为了更容易创建可观察性。我发现最接近将Rx与MSMQ一起使用的是以下页面:

  • 使用异步模式(queue.BeginReceive、queue.EndReceive)为MSMQ消息接收使用响应扩展(Rx)
  • MSMQ使用Rx-MSMQ接收超时问题的代码段

但我不确定如何将这些示例应用于我的IRequestListener接口。

任何其他想法也欢迎,如果合适的话,甚至可以修改我的基本设计。

我最初确实使用了FromAsyncPattern,但后来为它编写了一个类,因为它能更好地处理超时和中毒消息。一旦启动,队列无论如何都是热门的可观察对象。也可以使用Observable.Defer使其更接近Rx,而不是"开始/停止"。

以下是QueueObservable的基本实现。您可以先拨打ListenReceive

Subject<T> Subject = new Subject<T>();
protected void ListenReceive()
{
    Queue.BeginReceive(MessageQueue.InfiniteTimeout, null, OnReceive);
}
protected void OnReceive(IAsyncResult ar)
{
    Message message = null;
    try
    {
        message = Queue.EndReceive(ar);
    }
    catch (TimeoutException ex)
    {
        //retry?
    }
    if (message != null)
        Subject.OnNext((T) message.Body);
    Thread.Yield();
    if (!IsDisposed)
        ListenReceive();
}    
public IObservable<T> AsObservable()
{
        return Subject;
}

最新更新