我正在努力解决与TaskPoolScheduler
上订阅Observable.FromEventPattern()
有关的一些并发问题。
让我用代码示例说明:
var dataStore = new DataStore();
Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.Select(x =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
})
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
});
dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.
我的问题是,当原始可观察的Observable.FromEventPattern()
发出任何新项目时(即,当DataStore
对象启动新的DataChanged
事件时,它们似乎被阻止了等待先前的项目以完成整个管道的流动。
由于订阅是在TaskPoolScheduler
上完成的,因此我期望每件新的项目简单地启动新任务,但是实际上,如果管道忙碌,事件的来源似乎会阻止事件调用。
我如何完成在其自己的任务/线程上执行每个新发射项目(凸起的事件(的订阅,从而使源对象永远不会阻止其内部DataChangedEvent.Invoke()
调用?
(当然,除了在UI线程上执行的Subscribe()
lambda以外 - 已经是这种情况。(
作为旁注:#RXNET Slack频道中提到的@Jonstodle,TaskPoolScheduler
的语义可能与我假设不同。具体来说,他说这可能会创建任务,并在该任务内的事件循环中执行订阅和产生值。但是,如果是这样,那么我发现 first 事件调用不会阻止(因为第二个事件(有点奇怪。在我看来,如果执行订阅的任务池任务足够异步,以至于souce不必在第一个调用中阻止,那么也不需要在第二个调用上进行阻止吗?
您遇到的问题仅仅是RX的工作方式 - 普通RX管道中产生的每个值都是,管道良好,一次仅处理一个值。如果RX管道的来源,在您的情况下,FromEventPattern<DataChangedEventArgs>
的产生速度比观察者处理的速度更快,则它们会在管道中排队。
规则是,管道中的每个观察者一次只能处理一个值。对于任何调度程序,不仅是TaskPoolScheduler
。
使它起作用的方法非常简单 - 您创建并行管道,然后将值合并到单个管道中。
这是更改:
Observable
.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
.SubscribeOn(TaskPoolScheduler.Default)
.Select(x => x.EventArgs)
.StartWith(new DataChangedEventArgs())
.Throttle(TimeSpan.FromMilliseconds(25))
.SelectMany(x =>
Observable.Start(() =>
{
Thread.Sleep(5000); // Simulate long-running calculation.
var result = 42;
return result;
}))
.ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
.Subscribe(result =>
{
// Do some interesting work with the result.
// ...
// Do something that makes the DataStore raise another event.
dataStore.RaiseDataChangedEvent();
});
.SelectMany(x => Observable.Start(() =>
取代了.Select(x =>
,允许该值是一个新的可观察订阅,该订阅立即运行,然后将值合并回单个可观察的。
您可能更喜欢将其写入语义上相同的.Select(x => Observable.Start(() => ...)).Merge()
。
这是一个简单的示例,显示了它的工作方式:
var source = new Subject<int>();
source
.SelectMany(x =>
Observable.Start(() =>
{
Thread.Sleep(1000);
return x * 2;
}))
.Subscribe(result =>
{
Console.WriteLine(result);
source.OnNext(result);
source.OnNext(result + 1);
});
source.OnNext(1);
它产生:
246141281024283026162022184850565258606254323446444042