我有下面的代码块,当运行时,我希望使用阻塞集合,并在Subscribe()操作中执行一些异步活动,在RX移动到观察者序列中的下一个条目之前等待这些活动。
实际发生的情况是,所有条目都是并行处理的(或看起来是并行的)。
目前的代码如下!
uploadQueues[workspaceId] //BlockingCollection<FileEvent>>
.GetConsumingEnumerable()
.ToObservable()
.SubscribeOn(new TaskPoolScheduler(new TaskFactory()))
.Subscribe(
async fileEvent =>
{
//process file event
Debug.WriteLine($"Upload queue processor 1: {fileEvent.Event} | {fileEvent.SourcePath} => {fileEvent.DestPath}");
await Task.Delay(TimeSpan.FromSeconds(1));
})
任何指向正确方向的指针都将不胜感激。。还想知道是否不是使用RX,而是简单地生成一个从阻塞集合消耗的长时间运行的TPL任务!
想法?
Rx查询的主要问题是async/await
部分,因为您没有处理或照顾SynchronizationContext.Current
但它们是整个代码的其他问题。
从SubscribeOn
开始,TaskPoolSchedule
就是IDisposable
类,您应该正确地实现和处理它。
此外,当您使用ToObservable()
方法时,它不使用任何IScheduler
-请在此处查看详细信息。您可以指定一个EventLoopScheduler
来保证只使用一个线程/资源来处理(尽管这不是必要的,因为GetConsumingEnumerable
已经锁定了一个线程来消耗)。
如果你只想模拟延迟,最好的方法是:
enumerableItems.ToObservable()
.Select(a => Observable.Return(a).DelaySubscription(TimeSpan.FromSeconds(1)))
.Concat() // Keep push order
.Subscribe(
fileEvent =>
{
Debug.WriteLine(fileEvent);
});
DelaySubscription
比Delay
效率高一点,只要注意,延迟是不同步的,所以可能会在另一个Thread.ThreadId
中结束,但这并不重要,因为顺序和序列将被保持。
现在,如果你想在Rx中使用async/await并保持其单一威胁,这是另一个问题的问题。。。
[投票支持@J.Lennon的回答]
欢迎使用Rx。正如J.Lennon在上面提到的,上面对您的代码提出了一些改进建议。
与任何同步或异步代码一样,我们希望考虑我们的资源,以及如何确保在之后进行清理。我们还需要考虑如何处理异常。除了这些问题之外,在处理异步(以及并发)时,我们还需要考虑取消。我们应该提供取消操作的能力,我们还应该确保取消操作符合消费者的期望。
虽然我认为Rx不是读取队列的最佳工具(请参阅http://introtorx.com/Content/v1.0.10621.0/01_WhyRx.html#Wont),如果你真的想强迫Rx进入图片中,那么你可能还想确保你满足上述问题。。
以下代码使用
CancellationToken
允许取消GetConsumingEnumerable()
(https://msdn.microsoft.com/en-us/library/dd395014(v=vs.110).aspx)- CCD_ 15,以确保单个线程专用于处理队列。这也使它序列化。(http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#EventLoopScheduler)
- 将订阅服务器中的异步工作移动到它所属的查询管道中
- J.Lennon的
Concat
提示确保您正在做的异步工作也是串行的。(http://introtorx.com/Content/v1.0.10621.0/12_CombiningSequences.html#Concat) - 所有错误处理现在将是最终订阅者(订阅此方法的代码)唯一关心的问题。在这里,您将使用
OnError
处理程序。然而,这让我回到了为什么我不喜欢Rx来处理队列。失败后你该怎么办?尝试重读信息并陷入无限循环,或者把它扔在地板上假装它没有发生?(http://introtorx.com/Content/v1.0.10621.0/03_LifetimeManagement.html#Subscribe)
样本代码
IDictionary<string, BlockingCollection<object>> uploadQueues = new Dictionary<string, BlockingCollection<Object>>();
public IObservable<object> ListenToQueueEvents(string workspaceId)
{
//Not sure what the return value is here, using `Object` as a place holder.
// Note that we are using an overload that takes a `CancellationToken`
return Observable.Create<object>((obs, ct) =>
{
var els = new EventLoopScheduler(ts => new Thread(ts)
{
IsBackground = true,//? or false? Should it stop the process from terminating if it still running?
Name = $"{workspaceId}Processor"
});
var subscription = uploadQueues[workspaceId] //BlockingCollection<FileEvent>>
.GetConsumingEnumerable(ct) //Allow cancellation while wating for next item
.ToObservable(els) //Serialise onto a single thread.
.Select(evt=>TheAsyncThingIWasDoingInTheSubscribe(evt).ToObservable())
.Concat()
.Subscribe(obs);
//You could try to dispose of the els (EventLoopScheduler), But I have had issues doing so in the past.
// Leaving as Background should allow it to die (but non deterministically) :-(
return Task.FromResult(subscription);
});
}
private static Task<object> TheAsyncThingIWasDoingInTheSubscribe(object evt)
{
//The return of the async thing you were doing in the subscribe
return Task.FromResult(new Object());
}