响应式扩展同步订阅



谁能帮我做一个IObserver的同步订阅,这样调用方法就会阻塞,直到订阅完成。如:

出版商

public static class Publisher {
public static IObservable<string> NonBlocking()
    {
        return Observable.Create<string>(
            observable =>
            {
                Task.Run(() =>
                {
                    observable.OnNext("a");
                    Thread.Sleep(1000);
                    observable.OnNext("b");
                    Thread.Sleep(1000);
                    observable.OnCompleted();
                    Thread.Sleep(1000);
                });
                return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
            });
    }

}

用户

public static class Subscriber{
public static bool Subscribe()
    {
        Publisher.NonBlocking().Subscribe((s) =>
        {
            Debug.WriteLine(s);
        }, () =>
        {
            Debug.WriteLine("Complete");
        });
        // This will currently return true before the subscription is complete
        // I want to block and not Return until the Subscriber is Complete
        return true;
    }

}

您需要使用System.Reactive.Threading.Task:

把你的observable变成一个task…

var source = Publisher.NonBlocking()
    .Do(
        (s) => Debug.WriteLines(x),
        ()  => Debug.WriteLine("Completed")
    )
    .LastOrDefault()
    .ToTask();

Do(...).Subscribe()Subscribe(...)一样。所以Do只是增加了一些副作用。

LastOrDefault在那里是因为ToTask创建的Task将只等待来自源Observable的第一项,如果没有产生任何项,它将失败(抛出)。因此,LastOrDefault有效地使Task等待,直到源完成,无论它产生什么。

所以当我们有一个任务后,就等待它:

task.Wait(); // blocking
或者使用async/await:
await task; // non-blocking
<标题>编辑:

Cory Nelson说得很好:

在最新版本的c#和Visual Studio中,您实际上可以awaitIObservable<T>。这是一个非常酷的功能,但它的工作方式与等待Task略有不同。

当您等待一个任务时,它会导致该任务运行。如果等待任务的单个实例多次,则该任务将只执行一次。可观察对象略有不同。你可以把一个可观察对象看作一个具有多个返回值的异步函数……每次订阅一个可观察对象时,可观察对象/函数都会执行。因此这两段代码有不同的含义:

等待Observable:

// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe

等待一个可观察对象:

// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.
所以,本质上,等待一个可观察对象是这样工作的:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe

但是,await observable语法在Xamerin Studio中不工作(截至撰写本文时)。如果您正在使用Xamerin Studio,我强烈建议您在最后一刻使用ToTask,以模拟Visual Studio的await observable语法的行为。

相关内容

  • 没有找到相关文章