source1发射A、B、C、D等,并且从不完成
源2发射1,2并完成
我想合并到A1、B2、C1、D2等
更新
我最初的尝试是按照Theodor
的建议使用Zip
和Repeat
,但这会造成锁定,因为生成source2的成本很高。
Enigmativity
的最后一条评论解决了问题
source1.Zip(source2.ToEnumerable().ToArray().Repeat())
由于您想无限期地重复source2
,并且您说它很冷(从某种意义上说,它每次都会产生相同的值集,并且通常以相同的节奏(,而且成本很高,因此我们希望将IObservable<T>
转换为T[]
,以确保它只计算一次。
var array = source2.ToEnumerable().ToArray();
var output = source1.Zip(array.Repeat(), (x, y) => (x, y));
假设理想的大理石图是这样的:
Source1: +--------A-------B-------C--------D-------|
Source2: +----1--------------2--------|
Merged: +--------A1---------B2-------C1---D2------|
这里有一个具有这种行为的ZipWithRepeated
操作符:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return second.Replay(replayed => first.ToAsyncEnumerable()
.Zip(replayed.ToAsyncEnumerable().Repeat())
.ToObservable());
}
用法示例:
var merged = source1.ZipWithRepeated(source2);
此解决方案需要对System.Linq.Async和System.Interactive.Async包的依赖项,因为这两个序列在压缩之前都转换为IAsyncEnumerable<T>
。
可选:与其依赖Rx-Replay
运算符来缓冲source2序列,更有效的解决方案是在从可观察到的转换为异步可枚举之后进行缓冲。AFAICS在官方Rx/Ix库中没有内置的对回放/记忆IAsyncEnumerable<T>
的支持,但创建一个具有嵌入式缓冲的自定义Repeat
运算符并不是很困难。以下是基于此思想的ZipWithRepeated
运算符的替代实现:
static IObservable<(TFirst First, TSecond Second)> ZipWithRepeated<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second)
{
return first.ToAsyncEnumerable()
.Zip(second.ToAsyncEnumerable().RepeatBuffered())
.ToObservable();
}
private async static IAsyncEnumerable<TSource> RepeatBuffered<TSource>(
this IAsyncEnumerable<TSource> source,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var buffer = new List<TSource>();
await foreach (var item in source
.WithCancellation(cancellationToken).ConfigureAwait(false))
{
buffer.Add(item); yield return item;
}
while (true) foreach (var item in buffer) yield return item;
}
此实现不依赖于System.Interactive.Async包,而仅依赖于System.Linq.Anc包。