我对下面代码的行为感到惊讶(参见https://plnkr.co/edit/OVc26DmXpvXqSOJsQAoh?p=preview):
) let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
var subscription = source.subscribe(
function (x) {
console.log('Next: ' + x);
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
收益率Next: source 1: 0
Next: source 1: 1
——这里有一个长停顿——
Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed
,但我希望看到所有的序列显示在两者之间。出了什么问题?
<标题>编辑:注意,使用share()
并不总是治愈它。这个代码失败了:
let originalSequence = Observable.timer(0, 1000).take(10).share();
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
originalSequence.delay(i * 2000).map(x => "source " + i + ": " + x))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
和这段代码工作如我所期望的,我不明白为什么
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10).share())
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
<标题>编辑2:c#版本也有一个我没有预料到的行为,但同时的行为不同:
using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
namespace RxScanProblem
{
class Program
{
static void Main(string[] args)
{
var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
var empty = Observable.Empty<string>();
var source = Observable.Range(1, 5)
.Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
.Aggregate(empty, (s1, s2) => s1.TakeUntil(s2).Concat(s2))
.SelectMany(x => x);
source.Subscribe(
s => Console.WriteLine("Next: " + s),
ex => Console.WriteLine("Error: " + ex.Message),
() => Console.WriteLine("Completed"));
originalSequence.Connect();
// Dirty, I know
Thread.Sleep(20000);
}
}
}
产量(有一定延迟)
Next: source 1: 0
Next: source 1: 1
Next: source 1: 2
<标题> 3 编辑switch()的行为也不像我期望的那样!
let empty = Observable.empty();
let source = Observable.range(1, 5)
.map(i => Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.switch();
收益率Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
与c#
相同的(!)行为 var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
var empty = Observable.Empty<string>();
var source = Observable.Range(1, 5)
.Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
.Switch();
标题>标题>标题>让我们看看你的代码是做什么的。
let source = Observable.range(1, 5)
.map(i =>
Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
.reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
.mergeAll();
第一张地图将把{1,2,3,4,5}
变成
s1 = Observable.timer(1 * 2000, 1000).map(x => "source 1: " + x).take(10));
s2 = Observable.timer(2 * 2000, 1000).map(x => "source 2: " + x).take(10));
s3 = Observable.timer(3 * 2000, 1000).map(x => "source 3: " + x).take(10));
s4 = Observable.timer(4 * 2000, 1000).map(x => "source 4: " + x).take(10));
s5 = Observable.timer(5 * 2000, 1000).map(x => "source 5: " + x).take(10));
接下来,reduce
会像这样将它们粘合在一起:
s1.takeUntil(s2).concat(s2)
.takeUntil(s3).concat(s3)
.takeUntil(s4).concat(s4)
.takeUntil(s5).concat(s5)
现在让我们写一个小弹珠来显示所有这些流将产生什么:
s1 --0123456789
s2 ----0123456789
s3 ------0123456789
s4 --------0123456789
s5 ----------0123456789
s1.takeUntil(s2) --01|
.concat(s2) --01----0123456789
takeUntil(s3) --01--|
.concat(s3) --01--------0123456789
takeUntil(s4) --01----|
.concat(s4) --01------------0123456789
takeUntil(s5) --01------|
.concat(s5) --01----------------0123456789
现在,如果您使用share()
,则可以有效地发布源代码。发布意味着您同时向所有订阅者进行多播。如果有2个订阅者,即使其中一个比另一个晚到达,源也会继续为第二个订阅者进行中流传输。当第一个用户在第二个用户到达之前断开连接时,情况就变了。为了保存资源,share()
将断开源连接,然后重新订阅。如果你从冷观测开始,这意味着它们将重新开始,等待很长时间。
由于您使用.takeUntil(s2).concat(s2)
,实际上您将在再次订阅s2
之前取消订阅s2
。毕竟,concat
直到从takeUntil
接收到completed
才会连接,takeUntil
直到s2
产生才会发出completed
。如果s2
屈服,takeUntil将在转发completed
之前立即取消订阅。这意味着s2将在一瞬间没有订阅者,并且源将被重置。
您可能期望的是s2
将始终保持连接,并将继续在后台运行。如果你使用的是由有源产生的热观测,而不是通过share()
产生的冷观测,这将会有效。
我不会详细介绍switch()
,因为我相信你已经理解了那里的问题:当下一个到达时,它将断开前一个源,而不是当下一个产生时。
你能做的,是写你自己的'switchOnYield'
source.publish(src => src
.flatMap(inner1 =>
inner1.takeUntil(src.flatMap(inner2 => inner2.take(1)))
))
这样做的是合并来自source的所有源代码,但将takeUntil
与所有后面的源代码一起添加到它们上。如果后面的任何一个来源产生收益,第一批将被取消订阅。这是因为src
第一次产生时,.flatMap(inner1
就会运行。第二次生成时,src.flatMap(inner2
将把来自后一个源的任何项合并到takeUntil
操作符中。
我强烈建议您发布预期或期望的输出。你想要什么还不清楚。对于c#示例,将源代码更改为以下内容会使您更接近(我认为,再次,我不确定):
var source = Observable.Range(1, 5)
.Select(i => originalSequence
.Delay(TimeSpan.FromSeconds(2 * i))
.Select(x => "source " + i + ": " + x)
)
//making sure s2 is shared properly, thus concated properly
.Aggregate(empty, (s1, s2) => s2.Publish( _s2 => s1
.TakeUntil(_s2)
.Concat(_s2)
))
.SelectMany(x => x);
生成以下输出:
Next: source 1: 0
Next: source 1: 1
Next: source 2: 1
Next: source 3: 1
Next: source 4: 1
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed
这对我来说是有意义的。如果你能发布想要的输出,我将帮助你实现。