连接依赖于上一个IOobservable输出的IOobservables



我有一个函数IObservable<SomeEvent> SomeColdStream(int? someParam),我需要按顺序多次调用它,但someParam参数需要来自上一个SomeColdStream调用的最后一个事件,并且我最终需要一个Observable,其中所有SomeEvents都由它发出。

我的第一次尝试是.Concat组合子:

SomeColdStream(null).Concat(SomeColdStream(<<insert information from the last SomeEvent of first SomeColdStream call>>));

但我不知道如何将信息从一个流传送到下一个流。

我的下一次尝试是使用Publish:

var firstStream = SomeColdStream(null).Publish().RefCount()
var previous = firstStream().LastAsync();
firstStream.Concat(previous.SelectMany(p => SomeColdStream(p));

我还没有测试过这一点,因为这对Rx来说似乎有点"不合法",我想问是否有人知道我如何用更好的方法实现我想要的目标?

我知道我可以将IOobservable与";async/await";但这打破了我的测试,该测试严重依赖于CCD_;async/await";。

谢谢!

一个想法是将LastAsyncSelectMany运算符组合起来,如下所示:

IObservable<SomeEvent> sequence = SomeColdStream(null)
.LastAsync()
.SelectMany(x => SomeColdStream(x));

但是这将仅传播第二流的消息,并且第一流的所有消息都将被抑制。这个问题可能可以通过将第一个流合并到上面的序列来解决,但这也需要Publish作为第一个流,这样它就不会订阅两次。下面是一个扩展方法ConcatWithResult,它应该做正确的事情:

static IObservable<T> ConcatWithResult<T>(this IObservable<T> source,
Func<T, IObservable<T>> continuation)
{
return source.Publish(published => published.Merge(published
.LastAsync()
.SelectMany(x => continuation(x))));
}

使用";"副作用";Do操作员:

static IObservable<T> ConcatWithResult<T>(this IObservable<T> source,
Func<T, IObservable<T>> continuation)
{
return Observable.Defer(() =>
{
(T Value, bool HasValue) last = default;
return source
.Do(x => last = (x, true))
.Concat(Observable.Defer(() =>
{
if (!last.HasValue) throw new InvalidOperationException();
return continuation(last.Value);
}));
});
}

我想你正在寻找这个:

IObservable<TSource> Expand<TSource>(this IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector)

然后你可以这样构建你的查询:

IObservable<SomeEvent> query =
SomeColdStream(null)
.TakeLast(1)
.Expand(x => SomeColdStream(x.SomeValue).TakeLast(1));

让我们举一个例子:

IObservable<int> Demonstration(int? input)
{
if (input == null)
return Observable.Return(1);
else if (input < 5)
return new[] { input.Value + 1, input.Value * 3 }.ToObservable();
else
return Observable.Empty<int>();
}

现在我可以写这个查询了:

IObservable<int> query =
Demonstration(null)
.TakeLast(1)
.Expand(x => Demonstration(x).TakeLast(1));

从而产生序列CCD_ 13。

最新更新