反应式扩展:缓冲直到其他流完成



我发出请求并等待响应流上对该请求的响应。在等待期间,更新流上可能会有更新。

我希望在回复流完成后进行这些更新。喜欢这个:

Response (cold)   |      x y z |
Updates (hot)     | 1 2    3        4 |
Result            |      x y z 123  4 |

我似乎做不好。是否有一些聪明的运算符组合可以满足我的需求?

从问题的图表中:

缓冲所有更新,直到响应完成,然后按显示的其余更新显示。响应会立即显示。

updates.TakeUntil(response.LastAsync())
       .ToArray()
       .SelectMany(x => x)
       .Concat(updates)
       .Merge(response);

这将起作用,尽管你不能作为单行代码做到这一点的事实困扰着我:

var updatesReplayed = updates.Replay();
updatesReplayed.Connect();
var results = response.Concat(updatesReplayed);

以下是演示该功能的测试:

// time               | 1 2 3 4 5 6 7 8 9
// Response(cold)     |     x y z |
// Updates(hot)       | 1 2   3       4 |
// Result             |     x y z 123 4 |
var scheduler = new TestScheduler();
    var updates = scheduler.CreateHotObservable(
        ReactiveTest.OnNext(100.ToMsTicks(), "1"),
        ReactiveTest.OnNext(200.ToMsTicks(), "2"),
        ReactiveTest.OnNext(400.ToMsTicks(), "3"),
        ReactiveTest.OnNext(800.ToMsTicks(), "4"),
        ReactiveTest.OnCompleted<string>(900.ToMsTicks())
    );
    var response = scheduler.CreateColdObservable(
        ReactiveTest.OnNext(300.ToMsTicks(), "x"),
        ReactiveTest.OnNext(400.ToMsTicks(), "y"),
        ReactiveTest.OnNext(500.ToMsTicks(), "z"),
        ReactiveTest.OnCompleted<string>(600.ToMsTicks())
    );
    var expectedResults = scheduler.CreateHotObservable(
        ReactiveTest.OnNext(300.ToMsTicks(), "x"),
        ReactiveTest.OnNext(400.ToMsTicks(), "y"),
        ReactiveTest.OnNext(500.ToMsTicks(), "z"),
        ReactiveTest.OnNext(600.ToMsTicks(), "1"),
        ReactiveTest.OnNext(600.ToMsTicks(), "2"),
        ReactiveTest.OnNext(600.ToMsTicks(), "3"),
        ReactiveTest.OnNext(800.ToMsTicks(), "4"),
        ReactiveTest.OnCompleted<string>(900.ToMsTicks())
    );
    var replayed = updates.Replay();
    replayed.Connect();
    var results = response.Concat(replayed);
    var observer = scheduler.CreateObserver<string>();
    results.Subscribe(observer);
    scheduler.Start();
    ReactiveAssert.AreElementsEqual(expectedResults.Messages, observer.Messages);

相关内容

  • 没有找到相关文章

最新更新