反应式扩展:如何从IGroupedObservable中采样最新值



我有下面的单元测试,我试图对来自可通过其标识符观察到的源的对象进行分组,并每50个刻度对其进行采样。然而,具有GroupBySelectMany->CCD_ 3给出了与期望输出不同的输出。它返回OnNext@280,OnNext@380和OnCompleted@380.

当我只使用Sample方法运行测试时,输出满足预期,如AssertEqual调用中所示。

我在这里做错了什么?我希望两个版本都能产生相同的输出,因为在这种情况下,所有产品的标识符都是相同的。

Ps:我已经检查了关于同类操作的这个问题,以及关于分组节流+无功源单元测试的这个问题。

public class ObservableTests : ReactiveTest
{
[Fact]
public void sample_test()
{
// Arrange
var productPrice = new ProductPrice("1", 10);
var productPrice2 = new ProductPrice("1", 20);
var productPrice3 = new ProductPrice("1", 30);
var productPrice4 = new ProductPrice("1", 40);
var productPrice5 = new ProductPrice("1", 50);
var productPrice6 = new ProductPrice("1", 60);
var testScheduler = new TestScheduler();
var observable = testScheduler.CreateHotObservable(
OnNext(230, productPrice),
OnNext(260, productPrice2),
OnNext(280, productPrice3),
OnNext(340, productPrice4),
OnNext(360, productPrice5),
OnNext(380, productPrice6),
OnCompleted<ProductPrice>(380));
// Act
// WORKS
//var result = testScheduler.Start(
//    () =>
//        observable
//            .Sample(TimeSpan.FromTicks(50), testScheduler));
// DOESNT WORK
var result = testScheduler.Start(
() =>
observable
.GroupBy(value => value.Identifier)
.SelectMany(groupedObservable => groupedObservable.Sample(TimeSpan.FromTicks(50), testScheduler)));
result.Messages.AssertEqual(
OnNext(250, productPrice),
OnNext(300, productPrice3),
OnNext(350, productPrice4),
OnNext(400, productPrice6),
OnCompleted<ProductPrice>(400));
}
private class ProductPrice
{
public ProductPrice(string identifier, decimal price)
{
this.Identifier = identifier;
this.Price = price;
}
public string Identifier { get; }
public decimal Price { get; }
}

这是一个迟来的答案,但它完全符合预期。groupedObservable只有在第一个元素发出后才被创建和订阅——时间为230。因此,从那个时间点开始,采样将在该组中开始,在280、330、380等处给出结果…

在没有分组的工作示例中,可观察对象在刻度100处创建,并在200处订阅(TestScheduler.Start方法的此重载的默认设置(,因此第一个采样元素在250处发出。

最新更新