如何合并不同数据的Rx流



下面是我试图用Rx实现的一个简化场景。

public class A
{
public int Id { get; set; }
public string Name { get; set; }
};
public class B
{
public int Id { get; set; }
public int AId { get; set; }
public string SomeProperty { get; set; }
};
public class C
{
public A MyA { get; set; }
public IObservable<B> MyBObservable { get; set; }
}

这是一个数据序列

A(1,"Fred")

A(2,"汤姆")

A(3,"哈利")

B(100,1,"测试")

B(101,3,"xxx")

B(102,1,"测试2")

A(4,"Jane")

注意:在对应的a实例之前不能有B实例。

我想得到一个看起来像的结果

C(A(1,"Fred"),I可观测(B(100,1,"测试"),B(102,1,"测试2")

C(A(2,"Tom"),I可观测空)

C(A(3,"Harry"),I可观测(B(101,3,"xxx")

C(A(4,"Jane"),I可观测空)

结果必须包括A的所有实例(即使该A还没有B)

我有一些用户代码

IObservable<A> obsA = Observable.Create<A> { ... };
IObservable<B> obsB = Observable.Create<B> { ... };
obsB.Publish(); // shared
// what goes here ?
obsB.Connect();

我当前的尝试使用SelectMany,但并没有完全达到我想要的效果,因为我只得到A和B出现的条目。我还需要那些A没有B.的条目

有可能做一些类似的事情吗

obsA.SelectMany(a => new C { MyA = a, MyBObservable = obsB.Where(x => x.AId == a.Id) });

如果有帮助的话,obsA是一个冷的可观察的,并且会完成。obsB既可以是热的,也可以是冷的。如果C是可观察的或是一个列表,我并不介意。

非常感谢您的帮助。

问候Alan

IObservable<A> obsA = Observable.Create<A> { ... };
IObservable<B> obsB = Observable.Create<B> { ... };
var c = from a in obsA
join b in obsB on a.Id equals b.Id
select new C(a, b);

经过更多的实验,我得到了以下

obsA.Subscribe(a => new C (a, obsB.Where(x => x.AId == a.Id)));

然后在C 的构造函数中

public C (MyA a, IObservable<B> obsB)
{
MyA = a;
obsB.Subscribe(o => .. do something ...);
}

在我的版本中,类C不包含可观察的结果,而是包含对返回的每个项执行某些操作的结果。

相关内容

  • 没有找到相关文章

最新更新