我正在构建一个消息处理管道,并注意到当最后一个观察者处理订阅时,可观察量仍在抽取数据。
我查看了 Rx 文档,我基于它的假设是,根据文档,一旦最后一个观察者取消订阅,RefCount()
会断开可观察对象的连接:
然后,RefCount 会跟踪有多少其他观察者订阅了它,并且在最后一个观察者这样做之前不会断开与底层可连接可观察对象的连接。
为了说明这个问题,我在下面创建了一个非常简约的示例:
class Program
{
static void Main(string[] args)
{
_ = SimulateObservableIssue();
Console.ReadKey();
}
public static async Task SimulateObservableIssue()
{
IObservable<int> source = Observable.Create<int>(async (observer) =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Source publishing {i}");
observer.OnNext(i);
await Task.Delay(1000);
}
observer.OnCompleted();
return Disposable.Create(() => Console.WriteLine("Observable is disposed"));
});
var multiSource = source.Publish().RefCount();
var subscription = multiSource.Subscribe(x => Console.WriteLine("Observer received: " + x));
await Task.Delay(3000);
subscription.Dispose();
Console.WriteLine("Subscription disposed");
}
}
输出:
Source publishing 0
Observer received: 0
Source publishing 1
Observer received: 1
Source publishing 2
Observer received: 2
Subscription disposed
Source publishing 3
Source publishing 4
Source publishing 5
Source publishing 6
Source publishing 7
Source publishing 8
Source publishing 9
Observable is disposed
为什么在subscription.Dispose()
之后,可观察量仍然试图生成数据?
您的source
可观察量不尊重您提到的可观察量合约。如果将source
替换为以下内容:
var source = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(i => Console.WriteLine($"Source publishing {i}"), () => Console.WriteLine("Observable is disposed"))
.Take(10);
。您将看到它按预期工作。
至于为什么,想想一个可观察量有两个阶段:订阅和观察。无论订阅是否取消,在订阅期间发生的代码始终会发生。Observable.Create
代码都是订阅代码。
我编写的可观察量都是观察代码(大多数可观察代码应该是这样)。因此,它会适当地响应订阅取消。
您观察到的行为与RefCount
运算符无关。如果您直接订阅source
而不是multiSource
,则行为将是相同的。
该问题与您如何将Observable.Create
与异步 lambda 一起使用以创建自定义可观察量有关。Rx 库无法终止异步 lambda 创建的Task
。因此,尽管观察者已取消订阅,但任务仍在继续。一般情况下,任务只能合作终止,终止不再需要的任务的标准机制是通过发出任务观察到的CancellationToken
信号。Rx 库通过具有以下签名的Observable.Create
重载来支持此模式:
public static IObservable<TResult> Create<TResult>(
Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync);
提供的CancellationToken
由库提供和管理,您的责任是通过终止循环来遵守取消信号。一种方法是在循环内的各个点调用CancellationToken.ThrowIfCancellationRequested()
方法。在具体示例中,将CancellationToken
作为参数传递给Task.Delay
方法就足够了:
var source = Observable.Create<int>(async (observer, cancellationToken) =>
{
cancellationToken.Register(() => Console.WriteLine("Token is canceled"));
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"Source publishing {i}");
observer.OnNext(i);
await Task.Delay(1000, cancellationToken);
}
observer.OnCompleted();
Console.WriteLine($"Source is completed");
});
在取消的情况下,await Task.Delay
抛出的异常不会被任何人观察到,因为此时可观察量将没有观察者。没有理由尝试/捕获它,除非您想记录它。
您可能已经注意到 lambda 不返回Disposable
。发生这种情况是因为Disposable
的角色现在由提供的CancellationToken
.在引擎盖下,库正在使用一个CancellationDisposable
,当处置时取消一个内部CancellationTokenSource
,其Token
被传递给你的方法。