如何在发布之前将取消令牌插入ReactiveX流(IObservable) ?



如何在调用Publish之前将取消令牌插入现有的IObservable管道(即,在它成为IConnectableObservable之前)?

在订阅它之前,它必须是冷可观察管道的一部分(否则,我可以将CancellationToken令牌传递给IObservableSubscribe,RunAsync,ToTask等)。

有什么推荐的模式吗?

我可以想到使用TakeUntil来实现这一点,正如Theodor Zoulias在这里建议的那样。例如:

using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
async Task Test(CancellationToken token)
{
var publishedSequence = Observable
.Interval(TimeSpan.FromMilliseconds(100))
.Do(n => Console.WriteLine($"Emitting: {n}"))
.Skip(3)
.TakeUntil(
Observable.Create<long>(
observer => token.Register(
(_, token) => observer.OnError(new OperationCanceledException(token)),
null)))
.Finally(() => Console.WriteLine($"Finally"))
.Publish();
using var subscription = publishedSequence.Subscribe(
onNext: n => Console.WriteLine($"OnNext: {n}"),
onError: e => Console.WriteLine($"OnError: {e}"),
onCompleted: () => Console.WriteLine("OnCompleted"));
using var connection = publishedSequence.Connect();
await publishedSequence.ToTask();
}
var cts = new CancellationTokenSource(1000);
await Test(cts.Token);

输出:

Emitting: 0
Emitting: 1
Emitting: 2
Emitting: 3
OnNext: 3
Emitting: 4
OnNext: 4
Emitting: 5
OnNext: 5
Emitting: 6
OnNext: 6
Emitting: 7
OnNext: 7
Emitting: 8
OnNext: 8
OnError: System.OperationCanceledException: The operation was canceled.
Finally

我也有一个自定义运算符WithCancellation的原型,它基本上是一个直通IObservable,也侦听取消信号。不过,我宁愿坚持使用标准方法。

我想我已经发现了TakeUntil如何工作的竞争条件(也许是一个bug或只是一个我无法解释的行为),小提琴。我不能用我自己的WithCancellation实现(在小提琴中注释掉了)来复制它。

更新,如果我使用.TakeUntil(Task.Delay(Timeout.Infinite, token).ToObservable()),我也不能复制它。

事实上,库中没有这样的方法。我会从CancellationToken中创建一个可观察对象,然后使用TakeUntil操作符。

public static IObservable<Unit> ToObservable(this CancellationToken cancellationToken) {
// if the token can't be cancelled, use Never which will not complete
if (!cancellationToken.CanBeCanceled) {
return Observable.Never<Unit>();
}
// if the token is already cancelled, use Return which publishes
// Unit and completes immediately.
// This may save you from ObjectDisposedException later
if (cancellationToken.IsCancellationRequested) {
return Observable.Return<Unit>();
}
// use Create so that each .Subscribe is handled independently
return Observable.Create<Unit>(observer => {
// Observable.Create does not handle errors on its own
try {
// return the registration because Dispose will unregister it
return cancellationToken.Register(() => {
// When the token is cancelled, publish and complete
observer.OnNext(Unit.Default);
observer.OnCompleted();
});
}
catch (ObjectDisposedException e) {
// todo: consider handling this as if it were cancellation
observer.OnError(e);
}
});
}
public static IObservable<T> TakeUntil<T>(this IObservable<T> source, CancellationToken cancellationToken)
=> source.TakeUntil(cancellationToken.ToObservable());

相关内容

  • 没有找到相关文章

最新更新