如何重复可观察序列直到它为空?



我有一个IObservable<int>序列,它在订阅前9次时发出一个项目,在进一步订阅时,它什么也不发出,并立即完成:

int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});

现在我想重复这个序列,直到它完成。所以我使用了Repeat运算符:

source
.Repeat()
.Do(x => Console.WriteLine(x), () => Console.WriteLine("Completed"))
.Wait();

问题是这个查询永远不会完成。CCD_ 3一次又一次地订阅CCD_。更糟糕的是,当source停止生成元素时,查询进入了一个无情的死亡循环,劫持了CPU的一个核心(我的四核机器报告持续的CPU利用率为25%(。这是上面代码的输出:

1
2
3
4
5
6
7
8
9

我想要的是Repeat运算符的变体,当source停止生成元素时,它停止重复source。通过搜索内置的Rx运算符,我可以看到RepeatWhen运算符,但显然这只能用于更快地开始下一次重复,而不能完全停止重复:

// Repeatedly resubscribes to the source observable after a normal completion and
// when the observable returned by a handler produces an arbitrary item.
public static IObservable<TSource> RepeatWhen<TSource, TSignal>(
this IObservable<TSource> source,
Func<IObservable<object>, IObservable<TSignal>> handler);

不过,我不能100%确定,因为handler参数的描述非常模糊,所以我可能缺少一些东西:

为每个观察者调用并获取可观察序列对象的函数。它应返回任意项目的可观察项,该可观察项应响应于从可观察源接收到完成信号而发信号通知该任意项目。如果这个可观察到的信号是一个终端事件,则该序列以该信号终止。

我的问题是:如何实现一个重复source序列直到它为空的RepeatUntilEmpty运算符?是否可以基于上述RepeatWhen运算符来实现它?如果没有,我是否应该进入低级别(Observable.Create(并从头开始重新实现基本的Repeat功能?或者,我可以利用Materialize运算符,以某种方式将其与现有的Repeat相结合?我现在没有主意。我愿意接受任何形式的解决方案,无论是高杠杆还是低杠杆。

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
// What to do?
}

在我的原始代码中用RepeatUntilEmpty替换Repeat,应该可以在发出9元素后立即完成查询。

您实际上可以使用Materialize()/Dematerialize()根据从Repeat()语句接收到的通知构建自己的通知序列。通知顺序如下:

1C 2C 3C 4C 5C 6C 7C 8C 9C C C C ...

因此,我们寻找两个连续的OnCompleted通知。如果我们没有找到,我们仍然返回收到的OnNext通知,否则我们返回OnCompleted通知。代码可以是这样的:

public static void Main(string[] args)
{
int counter = 0;
IObservable<int> source = Observable.Defer(() =>
{
Console.WriteLine($"counter is now: {counter}");
if (counter > 20) {
System.Environment.Exit(1);
}
if (++counter < 10)
return Observable.Return(counter).Delay(TimeSpan.FromMilliseconds(100));
else
return Observable.Empty<int>();
});
source
.RepeatUntilEmpty()
.Subscribe(x => {
System.Threading.Thread.Sleep(10);
Console.WriteLine($"SUBSCRIBE: {x}");
}, () => Console.WriteLine("SUBSCRIBE:Completed"));
System.Threading.Thread.Sleep(10000);
Console.WriteLine("Main thread terminated");
}

使用RepeatUntilEmpty()方法如下:

public static IObservable<T> RepeatUntilEmpty<T>(this IObservable<T> source)
{
return source
.Materialize()
.Repeat()
.StartWith((Notification<T>)null)
.Buffer(2, 1)
.Select(it => {
Console.WriteLine($"Buffer content: {String.Join(",", it)}");
if (it[1].Kind != System.Reactive.NotificationKind.OnCompleted) {
return it[1];
}
// it[1] is OnCompleted, check the previous one
if (it[0] != null && it[0].Kind != System.Reactive.NotificationKind.OnCompleted) {
// not a consecutive OnCompleted, so we ignore this OnCompleted with a NULL marker
return null;
}
// okay, we have two consecutive OnCompleted, stop this observable.
return it[1];
})
.Where(it => it != null) // remove the NULL marker
.Dematerialize();
}

这将生成以下输出:

counter is now: 0
Buffer content: ,OnNext(1)
SUBSCRIBE: 1
Buffer content: OnNext(1),OnCompleted()
counter is now: 1
Buffer content: OnCompleted(),OnNext(2)
SUBSCRIBE: 2
Buffer content: OnNext(2),OnCompleted()
counter is now: 2
Buffer content: OnCompleted(),OnNext(3)
SUBSCRIBE: 3
Buffer content: OnNext(3),OnCompleted()
counter is now: 3
Buffer content: OnCompleted(),OnNext(4)
SUBSCRIBE: 4
Buffer content: OnNext(4),OnCompleted()
counter is now: 4
Buffer content: OnCompleted(),OnNext(5)
SUBSCRIBE: 5
Buffer content: OnNext(5),OnCompleted()
counter is now: 5
Buffer content: OnCompleted(),OnNext(6)
SUBSCRIBE: 6
Buffer content: OnNext(6),OnCompleted()
counter is now: 6
Buffer content: OnCompleted(),OnNext(7)
SUBSCRIBE: 7
Buffer content: OnNext(7),OnCompleted()
counter is now: 7
Buffer content: OnCompleted(),OnNext(8)
SUBSCRIBE: 8
Buffer content: OnNext(8),OnCompleted()
counter is now: 8
Buffer content: OnCompleted(),OnNext(9)
SUBSCRIBE: 9
Buffer content: OnNext(9),OnCompleted()
counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE:Completed
Main thread terminated

我还没有测试过该代码如何处理OnError()通知,所以您可能需要检查一下。此外,我还遇到了source.Materialize().Repeat()部分将从原始源读取更多数据的问题,尽管它后来决定停止可观测数据。特别是对于Repeat0语句,我有时会收到额外的输出,如:

counter is now: 9
Buffer content: OnCompleted(),OnCompleted()
SUBSCRIBE: Completed
counter is now: 10
counter is now: 11
counter is now: 12
counter is now: 13
counter is now: 14

对于您来说,这可能也是一个问题,因为Repeat()部分仍在尝试读取/连接空的可观察器。

相关内容

  • 没有找到相关文章

最新更新