Below我的代码运行良好。
但是,我认为其他observable
($fromArray(的运算符(映射中的combineLatest
很奇怪。
如何将combineLatest
作为observable
本身?
const item = ["television", "pc", "radio", "camera"];
const fooMarket = webSocket("wss://api.fooMarket.com/websocket/");
const barMarket = webSocket("wss://api.barMarket.com/websocket/");
// market WS unit Stream is like below (emit real-time price change for each Item, but Arbitrary Item Emission)
// {
// itemName : "televison",
// price: 980
// }
const fromArray$ = of(...item).pipe(
map((x) => {
const fooItem = fooMarket.pipe(
filter((y) => y.itemName === x),
map((z) => ({ itemName, price: item.price }))
);
const barItem = barMarket.pipe(
filter((y) => y.itemName === x),
map((z) => ({ itemName, price: item.price }))
);
combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem }).subscribe(
console.log
);
// return combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem }).subscribe; // this way makes fromArray$'s type as 'Subscription' (not 'Observable' type)
})
);
fromArray$.subscribe();
// result is like below
// {
// "foo-television": { itemName : "television", price: 980 },
// "bar-television : { itemName : "television", price: 950 }
// }
// {
// "foo-pc": { itemName : "pc", price: 110 },
// "bar-pc : { itemName : "pc", price: 120 }
// }
// ...continuing as Real-time Price Change
你的问题我还不完全清楚,但我认为你想去掉嵌套订阅。(如果我错了,请纠正我(
为此,您只需要将map
运算符替换为mergeMap
,并返回投影函数中可观察到的combineLatest
。
像这样:
const fromArray$ = of(...item).pipe(
mergeMap((x) => {
...
return combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem })
})
);
fromArray$.subscribe();
编辑:根据评论中的要求,获取组合数组Lastest的替代方法
作为一种选择,如果您想获得combineLatest
Observable的数组,并有可能单独或整体处理它们,您可以将mergeMap逻辑提取到一个函数,然后将项的数组映射到combineLastest的数组。
function livePricingFor(itemName: string) {
const fooItem = fooMarket.pipe(
filter((item) => item.itemName === itemName),
map((item) => ({ itemName, price: item.price }))
);
const barItem = barMarket.pipe(
filter((item) => item.itemName === itemName),
map((item) => ({ itemName, price: item.price }))
);
return combineLatest({
[`foo-${itemName}`]: fooItem,
[`bar-${itemName}`]: barItem,
});
}
...
const items = ["television", "pc", "radio", "camera"];
const itemsLivePricingObservables = items.map(livePricingFor); // Array of combineLastest
完成此操作后,要订阅所有内容,您可以使用合并运营商
merge(...itemsLivePricingObservables).subscribe()
或者,如果你只想订阅其中一个,你可以通过数组索引访问它
itemsLivePricingObservables[0].subscribe();
欢呼