反应式扩展等待队列为空



我有以下异步队列处理路由。

      var commandQueue = new BlockingCollection<MyCommand>();
      commandQueue
            .GetConsumingEnumerable()
            .ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5))
            .Subscribe(c =>
                           {
                               try
                               {
                                   ProcessCommand(c);
                               }
                               catch (Exception ex)
                               {
                                   Trace.TraceError(ex.ToString());
                               }
                           }
            );

在一个特定场景中(当我要获取一些数据时),我需要在出去获取数据之前确保我的 commandQueue 为空。此操作应同步发生。基本上,我想做一些类似的事情

  public void GetData()
  {
     commandQueue.WaitForEmpty(); 
     // could potentially be expressed: 
     // while (commandQueue.Count > 0) Thread.Sleep(10);
     return GoGetTheData()
  }

我意识到在理想情况下,所有调用者都将"获取数据"异步...但有时它必须以同步的方式发生......因此,我需要等待命令队列为空,以确保数据的一致性和最新性。

我知道我如何使用手动重置事件轻松做到这一点......但我想知道System.Reactive/TPL是否有一种简单的方法。

谢谢。

这是一个比起初看起来更困难的问题。您需要生产者-消费者作业语义的BlockingCollection(和基础ConcurrentQueue)。但您还希望能够观察这些集合发生了什么,包括等待"空"信号。

最好的办法是从这里看一下JobQueueParallelJobQueue

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7

其中包括一个可观察的WhenQueueEmpty,可以控制同时运行的作业和排队的作业的数量(在本例中,作业与您的命令概念同义)。

你能用这个吗?

    var dataObservable = Observable.Start(() =>
    {
        commandQueue.WaitForEmpty(); 
        return GoGetTheData();
    });
在我看来,

你的要求是

  • 异步获取数据
  • 并行处理此数据(最大并行度为 5
  • 重复该过程

如果这些是您的要求,并且您没有被迫使用 BlockingCollection,即它不是现有的 API,那么我认为您可以单独使用 Rx 轻松解决此问题。

var dataRequestScheduler = new EventLoopScheduler();
var subscription = GetTheData()
    .Repeat()
    .SubscribeOn(dataRequestScheduler)
    .ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5)
    .Subscribe(c =>
           {
               try
               {
                   ProcessCommand(c);
               }
               catch (Exception ex)
               {
                   Trace.TraceError(ex.ToString());
               }
           }
        );

其中 GetTheData 方法返回 IObservable

你可以利用 Observable.Start 和 Merge(5) 来获取最多 5 个线程,而无需自定义调度程序。

相关内容

  • 没有找到相关文章

最新更新