使用 System.Reactive.Linq 订阅取消的第一个事件



给定从事件流创建的 IObservable 序列:

IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()

我得到第一个事件发生:

MyEvent myEvent = await observable.FirstOrDefaultAsync();

但是,我想处理订阅,或者在用户单击取消按钮时中断可观察量。

目前我使用解决方法.ToTask(( 扩展方法,但我相信只有基于 Rective 扩展的更干净的解决方案。

_tsc = new CancellationTokenSource();
MyEvent myEvent;
try
{
    myEvent = await EventAggregator.GetEvent<MyEvent>()
        .FirstOrDefaultAsync()
        .ToTask(__tsc.Token);
}
catch (TaskCanceledException)
{
    myEvent = null;
}

void Cancel()
{
    _tsc.Cancel();
}

使用内置的 Rx 运算符,您想要执行的操作非常简单。

只需这样做:

IObservable<MyEvent> observable = EventAggregator.GetEvent<MyEvent>()
var endItAll = new Subject<Unit>();
MyEvent myEvent = await observable.TakeUntil(endItAll).FirstOrDefaultAsync();

现在,您只需调用endItAll.OnNext(Unit.Default)即可结束订阅并返回null MyEvent

我不确定这是否"更干净",但您可以在没有这样ToTask的情况下做到这一点:

static async void Test(IObservable<int> ob, CancellationToken ct)
{
    var first = await ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() => {
            o.OnNext(Unit.Default);
            o.OnCompleted();
    }))).FirstOrDefaultAsync();            
} 

因此,我们创建了另一个可观察量,当取消令牌被取消时,它将产生一个元素,然后我们使用TakeUntil重载,它将返回第一个序列中的元素,直到第二个序列生成一个元素。因此,在取消令牌后 - 您的 await 语句将返回默认值(引用类型为 null(。

您可以将其移动到扩展方法,然后它看起来会更好:

public static class Extensions {
    public static IObservable<T> TakeUntilCancelled<T>(this IObservable<T> ob, CancellationToken ct) {
        return ob.TakeUntil(Observable.Create<Unit>(o => ct.Register(() =>
        {
            o.OnNext(Unit.Default);
            o.OnCompleted();
        })));
    }
}
var first = await ob.TakeUntilCancelled(ct).FirstOrDefaultAsync();      

最新更新