不为空时可观察到的默认值



我有一些代码正在从数据库中读取,迭代每一行数据并对其执行一些逻辑,然后创建一个可观测值,该可观测值随后写入数据库,将其添加到数组中(创建可观测值数组(,这样,当通过forkJoin订阅可观测值阵列时,所有必要的数据都会写入数据库。

在阵列中的可观测数量变得相当大之前,这似乎非常好。行的数量可以在0-6000之间,因此数组的大小可以增长到这个值。当它达到这个大小时,observable不再写入数据库,而是从defaultIfEmpty返回默认值。我很困惑为什么它在较小的可观测量下正常工作,但在较大的量下突然变空。。。

通过的代码示例可能会更清楚一些

function writeToDB() {
// rows taken from the database, n = 0..6000
data = []
// array of observables
observables = []
for (const row of data) {
if (row.age > 20) {
// websocket between service and database, returns an observable
const observable = websocket.put(row).pipe(
o$.catchError((err) => { 
return r$.of(err) 
}),
o$.defaultIfEmpty({
success: true,
status: 200
})
);
observables.push(observable);
}
}
return forkJoin([...observables]);
}

使用此示例在订阅时可以非常好地工作,除非使用数组observables长度约为5000的大型数据集。在这一点上,它开始返回defaultIfEmpty{ success: true, status: 200 },我无法锻炼为什么。。。如有任何帮助或建议,我们将不胜感激。

从您在这里显示的内容中还不清楚。尽管如此,如果这适用于较少数量的呼叫,那么websocket很有可能在这些号码上表现出一些奇怪的行为。

值得一试的可能是限制websocket调用的并发性。

function writeToDB(data) {
// data contains rows taken from the database, n = 0..6000
return from(data).pipe(
filter(row => row.age > 20),
map(row => websocket.put(row).pipe(

catchError(err => of(err)),
// last makes sure that mergeAll behaves like forkJoin
last(undefined, {
success: true,
status: 200
})
)),
// mergeAll lets you choose how many can run concurrently
// for example, at most 50 websocket calls are made at
// once here
mergeAll(50),
toArray()
);

}

在这种情况下,我更喜欢mapmergeAll而不是mergeMap(因为我认为您不太可能错过它的并发方面(,但您可以使用任何一种。


function writeToDB(data) {
// data contains rows taken from the database, n = 0..6000
return from(data).pipe(
filter(row => row.age > 20),
mergeMap(row => websocket.put(row).pipe(

catchError(err => of(err)),
// last makes sure that mergeMap behaves like forkJoin
last(undefined, {
success: true,
status: 200
})
), 50), // <- sneaky! ;)
toArray()
);

}

最新更新