给定从事件流创建的 IObservable 序列:
IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
我得到第一个事件发生:
MyEvent myEvent = await observable.FirstOrDefaultAsync();
但是,我想处理订阅,或者在用户单击取消按钮时中断可观察量。
目前我使用解决方法.ToTask(( 扩展方法,但我相信只有基于 Rective 扩展的更干净的解决方案。
_tsc = new CancellationTokenSource();
MyEvent myEvent;
try
{
myEvent = await EventAggregator.GetEvent<MyEvent>()
.FirstOrDefaultAsync()
.ToTask(__tsc.Token);
}
catch (TaskCanceledException)
{
myEvent = null;
}
void Cancel()
{
_tsc.Cancel();
}
使用内置的 Rx 运算符,您想要执行的操作非常简单。
只需这样做:
IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
var endItAll = new Subject<Unit>();
MyEvent myEvent = await observable.TakeUntil(endItAll).FirstOrDefaultAsync();
现在,您只需调用endItAll.OnNext(Unit.Default)
即可结束订阅并返回null
MyEvent
。
我不确定这是否"更干净",但您可以在没有这样ToTask
的情况下做到这一点:
static async void Test(IObservable<int> ob, CancellationToken ct)
{
var first = await ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => {
o.OnNext(Unit.Default);
o.OnCompleted();
}))).FirstOrDefaultAsync();
}
因此,我们创建了另一个可观察量,当取消令牌被取消时,它将产生一个元素,然后我们使用TakeUntil
重载,它将返回第一个序列中的元素,直到第二个序列生成一个元素。因此,在取消令牌后 - 您的 await
语句将返回默认值(引用类型为 null
(。
您可以将其移动到扩展方法,然后它看起来会更好:
public static class Extensions {
public static IObservable<T> TakeUntilCancelled<T>(this IObservable<T> ob, CancellationToken ct) {
return ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() =>
{
o.OnNext(Unit.Default);
o.OnCompleted();
})));
}
}
var first = await ob.TakeUntilCancelled(ct).FirstOrDefaultAsync();