谁能帮我做一个IObserver的同步订阅,这样调用方法就会阻塞,直到订阅完成。如:
出版商public static class Publisher {
public static IObservable<string> NonBlocking()
{
return Observable.Create<string>(
observable =>
{
Task.Run(() =>
{
observable.OnNext("a");
Thread.Sleep(1000);
observable.OnNext("b");
Thread.Sleep(1000);
observable.OnCompleted();
Thread.Sleep(1000);
});
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
}
用户
public static class Subscriber{
public static bool Subscribe()
{
Publisher.NonBlocking().Subscribe((s) =>
{
Debug.WriteLine(s);
}, () =>
{
Debug.WriteLine("Complete");
});
// This will currently return true before the subscription is complete
// I want to block and not Return until the Subscriber is Complete
return true;
}
}
您需要使用System.Reactive.Threading.Task
:
把你的observable变成一个task…
var source = Publisher.NonBlocking()
.Do(
(s) => Debug.WriteLines(x),
() => Debug.WriteLine("Completed")
)
.LastOrDefault()
.ToTask();
Do(...).Subscribe()
和Subscribe(...)
一样。所以Do
只是增加了一些副作用。
LastOrDefault
在那里是因为ToTask
创建的Task
将只等待来自源Observable
的第一项,如果没有产生任何项,它将失败(抛出)。因此,LastOrDefault
有效地使Task
等待,直到源完成,无论它产生什么。
所以当我们有一个任务后,就等待它:
task.Wait(); // blocking
或者使用async/await:
await task; // non-blocking
<标题>编辑:Cory Nelson说得很好:
在最新版本的c#和Visual Studio中,您实际上可以await
和IObservable<T>
。这是一个非常酷的功能,但它的工作方式与等待Task
略有不同。
当您等待一个任务时,它会导致该任务运行。如果等待任务的单个实例多次,则该任务将只执行一次。可观察对象略有不同。你可以把一个可观察对象看作一个具有多个返回值的异步函数……每次订阅一个可观察对象时,可观察对象/函数都会执行。因此这两段代码有不同的含义:
等待Observable:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe
等待一个可观察对象:
// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.
所以,本质上,等待一个可观察对象是这样工作的:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe
但是,await observable
语法在Xamerin Studio中不工作(截至撰写本文时)。如果您正在使用Xamerin Studio,我强烈建议您在最后一刻使用ToTask
,以模拟Visual Studio的await observable
语法的行为。