Rx.Observable.reduce的意外行为



我对下面代码的行为感到惊讶(参见https://plnkr.co/edit/OVc26DmXpvXqSOJsQAoh?p=preview):

)
  let empty = Observable.empty();
  let source = Observable.range(1, 5)
    .map(i =>
      Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
    .reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
    .mergeAll();
   var subscription = source.subscribe(
    function (x) {
        console.log('Next: ' + x);
    },
    function (err) {
        console.log('Error: ' + err);   
    },
    function () {
        console.log('Completed');   
    });
收益率

Next: source 1: 0
Next: source 1: 1

——这里有一个长停顿——

Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed

,但我希望看到所有的序列显示在两者之间。出了什么问题?

<标题>编辑:

注意,使用share()并不总是治愈它。这个代码失败了:

   let originalSequence = Observable.timer(0, 1000).take(10).share();
   let empty = Observable.empty();
     let source = Observable.range(1, 5)
      .map(i =>
      originalSequence.delay(i * 2000).map(x => "source " + i + ": " + x))
    .reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
    .mergeAll(); 

和这段代码工作如我所期望的,我不明白为什么

 let empty = Observable.empty();
     let source = Observable.range(1, 5)
      .map(i =>
      Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10).share())
    .reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
    .mergeAll();
<标题>编辑2:

c#版本也有一个我没有预料到的行为,但同时的行为不同:

using System;
using System.Linq;
using System.Reactive.Linq;
using System.Threading;
namespace RxScanProblem
{
    class Program
    {
        static void Main(string[] args)
        {
            var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
            var empty = Observable.Empty<string>();
            var source = Observable.Range(1, 5)
             .Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
             .Aggregate(empty, (s1, s2) => s1.TakeUntil(s2).Concat(s2))
             .SelectMany(x => x);
            source.Subscribe(
                s => Console.WriteLine("Next: " + s),
                ex => Console.WriteLine("Error: " + ex.Message),
                () => Console.WriteLine("Completed"));
            originalSequence.Connect();
            // Dirty, I know
            Thread.Sleep(20000);
        }
    }
}

产量(有一定延迟)

Next: source 1: 0
Next: source 1: 1
Next: source 1: 2
<标题> 3 编辑

switch()的行为也不像我期望的那样!

   let empty = Observable.empty();
     let source = Observable.range(1, 5)
      .map(i => Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
      .switch();
收益率

Next: source 5: 0
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9

与c#

相同的(!)行为
   var originalSequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)).Take(10).Select(i => (long)i).Publish();
    var empty = Observable.Empty<string>();
    var source = Observable.Range(1, 5)
     .Select(i => originalSequence.Delay(TimeSpan.FromSeconds(2 * i)).Select(x => "source " + i + ": " + x))
     .Switch();

让我们看看你的代码是做什么的。

  let source = Observable.range(1, 5)
    .map(i =>
      Observable.timer(i * 2000, 1000).map(x => "source " + i + ": " + x).take(10))
    .reduce((s1, s2) => s1.takeUntil(s2).concat(s2), empty)
    .mergeAll();

第一张地图将把{1,2,3,4,5}变成

s1 = Observable.timer(1 * 2000, 1000).map(x => "source 1: " + x).take(10));
s2 = Observable.timer(2 * 2000, 1000).map(x => "source 2: " + x).take(10));
s3 = Observable.timer(3 * 2000, 1000).map(x => "source 3: " + x).take(10));
s4 = Observable.timer(4 * 2000, 1000).map(x => "source 4: " + x).take(10));
s5 = Observable.timer(5 * 2000, 1000).map(x => "source 5: " + x).take(10));

接下来,reduce会像这样将它们粘合在一起:

s1.takeUntil(s2).concat(s2)
   .takeUntil(s3).concat(s3)
   .takeUntil(s4).concat(s4)
   .takeUntil(s5).concat(s5)

现在让我们写一个小弹珠来显示所有这些流将产生什么:

s1               --0123456789
s2               ----0123456789
s3               ------0123456789
s4               --------0123456789
s5               ----------0123456789
s1.takeUntil(s2) --01|
  .concat(s2)    --01----0123456789
  takeUntil(s3)  --01--|
  .concat(s3)    --01--------0123456789
  takeUntil(s4)  --01----|
  .concat(s4)    --01------------0123456789
  takeUntil(s5)  --01------|
  .concat(s5)    --01----------------0123456789

现在,如果您使用share(),则可以有效地发布源代码。发布意味着您同时向所有订阅者进行多播。如果有2个订阅者,即使其中一个比另一个晚到达,源也会继续为第二个订阅者进行中流传输。当第一个用户在第二个用户到达之前断开连接时,情况就变了。为了保存资源,share()将断开源连接,然后重新订阅。如果你从冷观测开始,这意味着它们将重新开始,等待很长时间。

由于您使用.takeUntil(s2).concat(s2),实际上您将在再次订阅s2之前取消订阅s2。毕竟,concat直到从takeUntil接收到completed才会连接,takeUntil直到s2产生才会发出completed。如果s2屈服,takeUntil将在转发completed之前立即取消订阅。这意味着s2将在一瞬间没有订阅者,并且源将被重置。

您可能期望的是s2将始终保持连接,并将继续在后台运行。如果你使用的是由有源产生的热观测,而不是通过share()产生的冷观测,这将会有效。

我不会详细介绍switch(),因为我相信你已经理解了那里的问题:当下一个到达时,它将断开前一个源,而不是当下一个产生时。

你能做的,是写你自己的'switchOnYield'

source.publish(src => src
  .flatMap(inner1 => 
    inner1.takeUntil(src.flatMap(inner2 => inner2.take(1)))
  ))

这样做的是合并来自source的所有源代码,但将takeUntil与所有后面的源代码一起添加到它们上。如果后面的任何一个来源产生收益,第一批将被取消订阅。这是因为src第一次产生时,.flatMap(inner1就会运行。第二次生成时,src.flatMap(inner2将把来自后一个源的任何项合并到takeUntil操作符中。

demo

我强烈建议您发布预期或期望的输出。你想要什么还不清楚。对于c#示例,将源代码更改为以下内容会使您更接近(我认为,再次,我不确定):

var source = Observable.Range(1, 5)
    .Select(i => originalSequence
        .Delay(TimeSpan.FromSeconds(2 * i))
        .Select(x => "source " + i + ": " + x)
    )
    //making sure s2 is shared properly, thus concated properly
    .Aggregate(empty, (s1, s2) => s2.Publish( _s2 => s1
        .TakeUntil(_s2)
        .Concat(_s2)
    ))
    .SelectMany(x => x);

生成以下输出:

Next: source 1: 0
Next: source 1: 1
Next: source 2: 1
Next: source 3: 1
Next: source 4: 1
Next: source 5: 1
Next: source 5: 2
Next: source 5: 3
Next: source 5: 4
Next: source 5: 5
Next: source 5: 6
Next: source 5: 7
Next: source 5: 8
Next: source 5: 9
Completed

这对我来说是有意义的。如果你能发布想要的输出,我将帮助你实现。

相关内容

  • 没有找到相关文章

最新更新