订阅并在首次操作后立即取消订阅



我想在IObservable<T>上订阅并在收到类型T的第一个元素后立即取消订阅(dipose)订阅,即我只想在订阅后获得的第一个元素上调用操作。

这是我想出的方法:

public static class Extensions
{
    public static void SubscribeOnce<T>(this IObservable<T> observable, Action<T> action)
    {
        IDisposable subscription = null;
        subscription = observable.Subscribe(t =>
        {
            action(t);
            subscription.Dispose();
        });
    }
}

用法示例:

public class Program
{
    public static void Main()
    {
        var subject = new Subject<int>();
        subject.OnNext(0);
        subject.OnNext(1);
        subject.SubscribeOnce(i => Console.WriteLine(i));
        subject.OnNext(2);
        subject.OnNext(3);
        Console.ReadLine();
    }
}

它按预期工作,仅打印2 .这有什么问题或其他需要考虑的吗?也许有没有一种更清洁的方法使用开箱即用的 RX 附带的扩展方法?

var source = new Subject();
source
  .Take(1)
  .Subscribe(Console.WriteLine);
source.OnNext(5);
source.OnNext(6);
source.OnError(new Exception("oops!"));
source.OnNext(7);
source.OnNext(8);
// Output is "5". Subscription is auto-disposed. Error is ignored.

Takenth 元素生成后自动释放订阅。 :)

至于要考虑的其他事项,对于您的自定义可观察量,您应该注意,您可能还希望将 OnError 和 OnCompleted 通知传递给您的观察器,Take也会为您处理。

内置操作员还有其他好处,例如更好的Dispose处理。

相关内容

  • 没有找到相关文章

最新更新