rx.js将combineLatest与内部WebSocket主题组合为Observable



Below我的代码运行良好。

但是,我认为其他observable($fromArray(的运算符(映射中的combineLatest很奇怪。

如何将combineLatest作为observable本身?

const item = ["television", "pc", "radio", "camera"];
const fooMarket = webSocket("wss://api.fooMarket.com/websocket/");
const barMarket = webSocket("wss://api.barMarket.com/websocket/");
// market WS unit Stream is like below (emit real-time price change for each Item, but Arbitrary Item Emission)
// {
//   itemName : "televison",
//   price: 980
// }
const fromArray$ = of(...item).pipe(
map((x) => {
const fooItem = fooMarket.pipe(
filter((y) => y.itemName === x),
map((z) => ({ itemName, price: item.price }))
);
const barItem = barMarket.pipe(
filter((y) => y.itemName === x),
map((z) => ({ itemName, price: item.price }))
);
combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem }).subscribe(
console.log
);
// return combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem }).subscribe; // this way makes fromArray$'s type as 'Subscription' (not 'Observable' type)
})
);
fromArray$.subscribe();
// result is like below
// {
//   "foo-television": { itemName : "television", price: 980 },
//   "bar-television : { itemName : "television", price: 950 }
// }
// {
//   "foo-pc": { itemName : "pc", price: 110 },
//   "bar-pc : { itemName : "pc", price: 120 }
// }
// ...continuing as Real-time Price Change

你的问题我还不完全清楚,但我认为你想去掉嵌套订阅。(如果我错了,请纠正我(

为此,您只需要将map运算符替换为mergeMap,并返回投影函数中可观察到的combineLatest

像这样:

const fromArray$ = of(...item).pipe(
mergeMap((x) => {
...
return combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem })
})
);
fromArray$.subscribe();

编辑:根据评论中的要求,获取组合数组Lastest的替代方法

作为一种选择,如果您想获得combineLatestObservable的数组,并有可能单独或整体处理它们,您可以将mergeMap逻辑提取到一个函数,然后将项的数组映射到combineLastest的数组。

function livePricingFor(itemName: string) {
const fooItem = fooMarket.pipe(
filter((item) => item.itemName === itemName),
map((item) => ({ itemName, price: item.price }))
);
const barItem = barMarket.pipe(
filter((item) => item.itemName === itemName),
map((item) => ({ itemName, price: item.price }))
);
return combineLatest({
[`foo-${itemName}`]: fooItem,
[`bar-${itemName}`]: barItem,
});
}
... 
const items = ["television", "pc", "radio", "camera"];
const itemsLivePricingObservables = items.map(livePricingFor); // Array of combineLastest

完成此操作后,要订阅所有内容,您可以使用合并运营商

merge(...itemsLivePricingObservables).subscribe()

或者,如果你只想订阅其中一个,你可以通过数组索引访问它

itemsLivePricingObservables[0].subscribe();

欢呼

最新更新