模拟 IObservable 的收益回报



当我不知道如何使用 LINQ 生成一些可枚举项时,我只需创建自己的扩展方法并使用 yield 关键字。这给了我一个闭包,我可以在其中存储计数器或其他一些聚合值等内容。

IObservable我不知道是否有内置的方法可以做到这一点。我最近想生成一个可观察量,给定另一个IObservable<string>,忽略所有内容,直到源中出现起始值(例如"开始"(,然后开始忽略所有内容,直到源产生结束值(例如"结束"(。

因此,例如,如果我的来源是{"1", "start", "2", "3", "end", "4", "start", "5", "end},则新的可观察量应该是{"2", "3" "5"}

可能有一种方法可以使用 IObservable 的内置 int 方法来做到这一点,但如果它是一个IEnumerable,使用 yield 关键字会很简单。所以我想知道是否有类似的简单方法可以为 IObservable 做到这一点。

我想出了这个基本上可以完成这项工作的小类:

public class ClosureSelectMany<TSource, TResult> : IObservable<TResult>
{
    private readonly IObservable<TSource> _Source;
    private readonly Func<Func<TSource, IObservable<TResult>>> _Selector;
    public ClosureSelectMany(IObservable<TSource> source, Func<Func<TSource, IObservable<TResult>>> selector)
    {
        _Source = source;
        _Selector = selector;
    }
    public IDisposable Subscribe(IObserver<TResult> observer)
    {
        var selector = _Selector();
        return _Source.SelectMany(selector).Subscribe(observer);
    }
}
public static class ObservableHelpers
{
    public static IObservable<TResult> ClosureSelectMany<TSource, TResult>(this IObservable<TSource> source, Func<Func<TSource, IObservable<TResult>>> selector)
    {
        return new ClosureSelectMany<TSource, TResult>(source, selector);
    }
}

然后我可以像这样使用它:

test = input.ClosureSelectMany<string, string>(() =>
{
    bool running = false;
    return val =>
    {
        if (val == "end")
            running = false;
        var result = running ? Observable.Return(val) : Observable.Empty<string>();
        if (val == "start")
            running = true;
        return result;
    };
});

这看起来像如果我在 IEnumerable 中使用 yield 关键字我会做什么。

所以我想知道我是否正在重新发明轮子,并且已经有一些内置功能可以做到这一点,如果不是为什么。也许这种方法可能会导致我现在没有看到的任何其他问题。

您可以使用

Observable.Create而不是将现有运算符组合在一起,尽管尽可能组合通常是一个更好的主意。

请注意,Create 还具有接受 Task -return 函数的重载,允许您定义异步迭代器 - 一个使用 await 而不是 yield 的协程,尽管这不是您在这里特别需要的。

但是,您的问题可以通过组合ScanWhereSelect来解决:

xs.Scan(
  new { Running = false, Value = default(string) }, 
  (previous, current) => new
  {
    Running = current == "start" || (previous.Running && current != "end"), 
    Value = current
  })
  .Where(state => state.Running && state.Value != "start")
  .Select(state => state.Value);

对于可观察量,没有yield return等价物。它肯定是可以烘焙到编译器中的东西,但它目前不可用。

但是,使用可用的运算符,您可以轻松完成所需的操作。

这对我有用:

var results =
    source
        .Publish(ss =>
            ss
                .Window(
                    ss.Where(s0 => s0 == "start"),
                    s => ss.Where(s1 => s1 == "end"))
                .Select(xs => xs.Skip(1).SkipLast(1))
                .Merge());

鉴于此来源:

var source = new []
{
    "1", "start", "2", "3", "end", "4", "start", "5", "end"
}.ToObservable();

我得到这个输出:

2 
3 
5 

首先,我想澄清一下,这里的重点是收益率回报的替代方案。

其次,这个特定的示例问题可以这样解决:

bool running = false;
Observable.ToObservable(new [] {"1", "start", "2", "3", "end", "4", "start", "5", "end"})
    .Where(s => {
        if (s == "start") running = true;            
        if (s == "end") running = false;                                    
        return (running && s != "start");
    })
    .Dump();

现在,答案是:

您需要记住,收益回报只是语法糖,使您不必自己实现 IEnumerable 和 IEnumerator。

您可以将其视为 IEnumerator.MoveNext(( 和 IEnumerator.Current 的组合,而无需额外的类来保存枚举器的状态,如 MSDN 文档 (https://msdn.microsoft.com/en-us/library/9k7k7cf0.aspx( 中所述。

您可能还读过IObservable是异步/推送">"到同步/拉取IEnumerable(http://reactivex.io/intro.html(。

考虑到所有这些,你可以认为 IObservable 等价于 yield return(IEnumerator.MoveNext(( + IEnumerator.Current( 是 IObserver.OnNext(T value( 方法。

这样想,您可以轻松地实现此示例中所需的内容(在 LINQPad 中测试(:

void Main()
{
    var q = FilteredSource.Generate();
    q.Dump();
}
public class FilteredSource
{
    public static IObservable<string> Generate()
    {           
        var q = from s in OriginalSource.Generate()
                select s;
        return Observable.Create<string>(
            async observer =>
            {           
                bool produce = false;                
                try
                {
                    await q.ForEachAsync(s => {
                        if (s == "start")
                            produce = true;
                        if (s == "end")
                            produce = false;                        
                        if (produce && s != "start")
                            observer.OnNext(s);                        
                    });
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
                observer.OnCompleted();
                return Disposable.Empty;
            });
    }
}
public class OriginalSource
{
    public static IObservable<string> Generate()
    {
        return Observable.Create<string>(
            observer =>
            {
                try
                {
                    string[] list = {"1", "start", "2", "3", "end", "4", "start", "5", "end"};
                    foreach (string s in list)
                        observer.OnNext(s);
                }
                catch (Exception ex)
                {
                    observer.OnError(ex);
                }
                observer.OnCompleted();
                return Disposable.Empty;
            });
    }            
}

相关内容

  • 没有找到相关文章

最新更新