我有以下异步队列处理路由。
var commandQueue = new BlockingCollection<MyCommand>();
commandQueue
.GetConsumingEnumerable()
.ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5))
.Subscribe(c =>
{
try
{
ProcessCommand(c);
}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());
}
}
);
在一个特定场景中(当我要获取一些数据时),我需要在出去获取数据之前确保我的 commandQueue 为空。此操作应同步发生。基本上,我想做一些类似的事情
public void GetData()
{
commandQueue.WaitForEmpty();
// could potentially be expressed:
// while (commandQueue.Count > 0) Thread.Sleep(10);
return GoGetTheData()
}
我意识到在理想情况下,所有调用者都将"获取数据"异步...但有时它必须以同步的方式发生......因此,我需要等待命令队列为空,以确保数据的一致性和最新性。
我知道我如何使用手动重置事件轻松做到这一点......但我想知道System.Reactive/TPL是否有一种简单的方法。
谢谢。
这是一个比起初看起来更困难的问题。您需要生产者-消费者作业语义的BlockingCollection
(和基础ConcurrentQueue
)。但您还希望能够观察这些集合发生了什么,包括等待"空"信号。
最好的办法是从这里看一下JobQueue
和ParallelJobQueue
:
http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7
其中包括一个可观察的WhenQueueEmpty
,可以控制同时运行的作业和排队的作业的数量(在本例中,作业与您的命令概念同义)。
你能用这个吗?
var dataObservable = Observable.Start(() =>
{
commandQueue.WaitForEmpty();
return GoGetTheData();
});
你的要求是
- 异步获取数据
- 并行处理此数据(最大并行度为 5 )
- 重复该过程
如果这些是您的要求,并且您没有被迫使用 BlockingCollection,即它不是现有的 API,那么我认为您可以单独使用 Rx 轻松解决此问题。
var dataRequestScheduler = new EventLoopScheduler();
var subscription = GetTheData()
.Repeat()
.SubscribeOn(dataRequestScheduler)
.ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5)
.Subscribe(c =>
{
try
{
ProcessCommand(c);
}
catch (Exception ex)
{
Trace.TraceError(ex.ToString());
}
}
);
其中 GetTheData 方法返回 IObservable
你可以利用 Observable.Start 和 Merge(5) 来获取最多 5 个线程,而无需自定义调度程序。