我使用使用CancellationDisposable
的Observable.Create
实现了一个可观察对象。由于我想在创建后共享可观察到的内容,我使用了Publish
和更高版本的Connect
。如果Connect
返回的一次性物品被丢弃,则CancellationDisposable
也被丢弃并且抛出OperationCancelledException
。问题是观察者o1
从未被告知异常。为什么会发生这种情况?我如何熟练地将CancellationDisposable
和Publish
组合使用?
var obs = Observable.Create<int>(
observer =>
{
var cancel = new CancellationDisposable();
var scheduler = Scheduler.Default.Schedule(() =>
{
try
{
observer.OnNext(1);
Thread.Sleep(2000);
cancel.Token.ThrowIfCancellationRequested();
observer.OnNext(2);
observer.OnCompleted();
}
catch (Exception ex)
{
observer.OnError(ex);
}
});
return new CompositeDisposable(cancel, scheduler);
})
.ObserveOn(ThreadPoolScheduler.Instance)
.Publish();
var o1 = obs.Subscribe(x => Console.WriteLine("Next: {0}", x), x => Console.WriteLine("Error: {0}", x), () => Console.WriteLine("Completed"));
var connection = obs.Connect();
Console.WriteLine("Press key to cancel");
Console.ReadLine();
connection.Dispose();
Console.ReadLine();
立即按Enter键输出:
Press key to cancel
Next: 1
处理连接会处理所有订阅(内部)。这是Rx的抵消模型。当一个订阅被释放时,观察者不再接收任何类型的通知。
参见处方设计指南文件中的§4.4。
如果您希望在观察员被取消时收到通知,请使用Finally运算符。请注意,它通常适用于终止,因此您的操作也会在完成或失败时调用。
var o1 = obs.Finally(() => Console.WriteLine("Terminated")).Subscribe(...);
更新:
仔细阅读我的答案,我意识到我并不完全清楚。Finally
运算符仅在您自己处理订阅时调用。处理连接只会处理对已发布的可观察对象的内部订阅,使您的订阅处于"活动"状态。原因是您可以随时重新连接已发布的可观察对象,并且您的订阅将继续收到通知。因此,o1
实际上根本没有被取消。
此外,无论如何都不能调用OnCompleted
,因为这样,通过简单地重新连接可观察对象,就可以在同一订阅上多次调用它,这当然违反了Rx语法。参见处方设计指南中的§4.1
更新2:
正如评论中所提到的,虽然OnCompleted
在取消时不被调用,但它是在可观察到的成功终止时调用的;然而,观察者将来将不再接收任何通知(以满足Rx语法),即使在随后的重新连接时也是如此。此外,由于序列已终止,所有Finally
运算符都将已执行。
更新3:回答
在取消可连接的可观测值时,当然有可能让OnCompleted
调用每个观测者,尽管我不建议这样做,因为这非常奇怪,因为它违背了Rx中的原则,但遗憾的是,我必须提供它来从技术上回答最初的问题。该解决方案的关键是使用Subject<T>
以及在呼叫Publish
之前插入的Finally
运算符。
请注意,我对代码进行了轻微的更改,以使用异步迭代器并删除错误处理(请参阅下面的注释)。
using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
namespace RxLabs.Net45
{
class PublishFinallyLab
{
public static void Main()
{
var obsOrCancellation = new Subject<int>();
var obs = Observable.Create<int>(
async (observer, cancel) =>
{
observer.OnNext(1);
await Task.Delay(TimeSpan.FromSeconds(2), cancel).ConfigureAwait(false);
if (!cancel.IsCancellationRequested)
{
observer.OnNext(2);
}
})
.Finally(obsOrCancellation.OnCompleted)
.Publish();
obs.Subscribe(obsOrCancellation);
var o1 = obsOrCancellation
.Finally(() => Console.WriteLine("Finally!"))
.Subscribe(
x => Console.WriteLine("Next: {0}", x),
ex => Console.WriteLine("Error: {0}", ex),
() => Console.WriteLine("Completed"));
do
{
using (var connection = obs.Connect())
{
Console.WriteLine("Press any key to cancel.");
Console.ReadKey();
}
Console.WriteLine("Press any key to continue.");
Console.ReadKey();
}
while (true);
}
}
}
--
在一个无关但重要的方面,捕捉观察者抛出的异常是一种反模式。不要这样做。你应该完全删除try..catch
。在可观察到的对非观察者代码的调用中没有可能抛出的代码,因此根本不应该调用OnError
。