在使用.NET的反应性扩展中,我如何避免使用Observable.FromeventPattern



我正在努力解决与TaskPoolScheduler上订阅Observable.FromEventPattern()有关的一些并发问题。

让我用代码示例说明:

var dataStore = new DataStore();
Observable.FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .Select(x => 
    {
        Thread.Sleep(5000); // Simulate long-running calculation.
        var result = 42;
        return result;
    })
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...
        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent(); // <- DEADLOCK!
    });
dataStore.RaiseDataChangedEvent(); // <- Returns immediately, i.e. does NOT wait for long-running calculation.
dataStore.RaiseDataChangedEvent(); // <- Blocks while waiting for the previous long-running calculation to complete, then returns eventually.

我的问题是,当原始可观察的Observable.FromEventPattern()发出任何新项目时(即,当DataStore对象启动新的DataChanged事件时,它们似乎被阻止了等待先前的项目以完成整个管道的流动。

由于订阅是在TaskPoolScheduler上完成的,因此我期望每件新的项目简单地启动新任务,但是实际上,如果管道忙碌,事件的来源似乎会阻止事件调用。

我如何完成在其自己的任务/线程上执行每个新发射项目(凸起的事件(的订阅,从而使源对象永远不会阻止其内部DataChangedEvent.Invoke()调用?

(当然,除了在UI线程上执行的Subscribe() lambda以外 - 已经是这种情况。(

作为旁注:#RXNET Slack频道中提到的@Jonstodle,TaskPoolScheduler的语义可能与我假设不同。具体来说,他说这可能会创建任务,并在该任务内的事件循环中执行订阅和产生值。但是,如果是这样,那么我发现 first 事件调用不会阻止(因为第二个事件(有点奇怪。在我看来,如果执行订阅的任务池任务足够异步,以至于souce不必在第一个调用中阻止,那么也不需要在第二个调用上进行阻止吗?

您遇到的问题仅仅是RX的工作方式 - 普通RX管道中产生的每个值都是,管道良好,一次仅处理一个值。如果RX管道的来源,在您的情况下,FromEventPattern<DataChangedEventArgs>的产生速度比观察者处理的速度更快,则它们会在管道中排队。

规则是,管道中的每个观察者一次只能处理一个值。对于任何调度程序,不仅是TaskPoolScheduler

使它起作用的方法非常简单 - 您创建并行管道,然后将值合并到单个管道中。

这是更改:

Observable
    .FromEventPattern<DataChangedEventArgs>(dataStore, nameof(dataStore.DataChanged))
    .SubscribeOn(TaskPoolScheduler.Default)
    .Select(x => x.EventArgs)
    .StartWith(new DataChangedEventArgs())
    .Throttle(TimeSpan.FromMilliseconds(25))
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(5000); // Simulate long-running calculation.
            var result = 42;
            return result;
        }))
    .ObserveOn(new SynchronizationContextScheduler(SynchronizationContext.Current))
    .Subscribe(result =>
    {
        // Do some interesting work with the result.
        // ...
        // Do something that makes the DataStore raise another event.
        dataStore.RaiseDataChangedEvent();
    });

.SelectMany(x => Observable.Start(() =>取代了.Select(x =>,允许该值是一个新的可观察订阅,该订阅立即运行,然后将值合并回单个可观察的。

您可能更喜欢将其写入语义上相同的.Select(x => Observable.Start(() => ...)).Merge()

这是一个简单的示例,显示了它的工作方式:

var source = new Subject<int>();
source
    .SelectMany(x =>
        Observable.Start(() =>
        {
            Thread.Sleep(1000);
            return x * 2;
        }))
    .Subscribe(result =>
    {
        Console.WriteLine(result);
        source.OnNext(result);
        source.OnNext(result + 1);
    });
source.OnNext(1);

它产生:

246141281024283026162022184850565258606254323446444042

相关内容

  • 没有找到相关文章

最新更新