将 IObservable<IEnumerable<T>> 转换为 IEnumerable<IObservable<T>>



如何将一个可枚举的xys的可观察对象变为可枚举的yxs,其中yxs的每个可观察对象都关注xys的每个时间步长的特定元素?我想要的类似于可枚举对象的可枚举对象的换位。

的例子:

IObservable<IEnumerable<int>> xys = Observable.Generate(0, _ => true, i => ++i, i => new[] {0 + i, 1 + i, 2 + i});
// xys = emits {0,1,2}, {1,2,3}, ...
IEnumerable<IObservable<int>> yxs = new[]
{
    Observable.Generate(0, i=>true, i=> ++i, i=>i),
    Observable.Generate(1, i=>true, i=> ++i, i=>i),
    Observable.Generate(2, i=>true, i=> ++i, i=>i),
};
// yxs = {emits 0, 1, ...}, {emits 1, 2, ...}, {emits 2, 3, ...}

我对一个已经是Rx一部分的函数特别感兴趣。像上面的例子一样,无限的可观察对象和无限的可枚举对象应该是可能的。

这会正确转换你的"types"。然而,你不能改变何时/如何在下面传递元素的语义,所以不管你怎么做,你最终只会缓冲和阻塞。

dest = source.ToEnumerable().Map(x => x.ToObservable());

这是我目前想到的。然而,它是一个自制的解决方案,不是Rx的一部分。如果有人能给我指出一个涉及库函数的解决方案,我会更加感激。

public static IEnumerable<IObservable<T>> ObserveElements<T>(this IObservable<IEnumerable<T>> obs)
{
    var i = 0;
    while (true)
    {
        var idx = i++;
        yield return from enumerable in obs
                     let x = enumerable.ElementAtOrDefault(idx)
                     where !Equals(x, default(T))
                     select x;
    }
}

显然,你只需要.Take()那么多的可观察对象。

在Haskell术语中,我认为它实际上是sequenceIObservable单子的实现,专门用于IEnumerable

相关内容

  • 没有找到相关文章

最新更新