在RxJS中,是否应该在不发射项目的可观察对象中调用complete



在我的上一篇文章中,我试图使用RxJS缓冲挂起的http请求。我原以为bufferCount是我需要的,但我发现我的项目低于缓冲区大小,它只是等待,这不是我想要的。

我现在有了一个新的方案,使用take。它似乎在做我想要做的事情,除了当我得到的可观察对象没有项目(剩下(时,完整的永远不会被调用。

我有如下的东西。。

const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
// FlatMap turns the observable of a single Requests[] to observable of Requests
flatMap(x => x),
// Only get requests unprocessed
filter(x => x.processedState === ProcessedState.unprocessed),
// Batches of batchSize in each emit
take(3),
);

let requestsSent = false;
pendingRequests.subscribe(nextRequest => {           
requestsSent = true;
this.sendRequest(nextEvent);           
},
error => {            
this.logger.error(`${this.moduleName}.sendRequest:  Error ${error}`);
},
() => {         
// ****  This is not called if pendingRequests is empty ****
if (requestsSent ) {              
this.store$.dispatch(myActions.continuePolling());              
} else {
this.store$.dispatch(myActions.stopPolling());              
}
}
);

因此,take(3)将获得接下来的3个挂起的请求并发送它们((,在那里我还调度了一个操作,将已处理的状态设置为非ProcessedState.pending,这样我们就不会在下一次轮询中获得它们(

这一切都很好,但当pendingRequests最终什么都不返回(为空(时,用***标记的completed块。未调用。我本以为这会被立即调用。

我不确定这是否重要,因为由于我当时没有采取行动继续投票,投票确实停止了。

但我最担心的是,如果pendingRequests没有完成,我是否需要取消订阅以防止任何泄露?我假设如果呼叫了complete,我不需要取消订阅?

更新

为了使pendingReguests始终完整,我采取了一种稍微不同的方法。而不是使用CCD_ 9运算符来";滤波器";,我每次都会得到整个列表,上面只有take(1)。我总是会得到列表,即使它是空的,所以pendingReguests每次都会完成。

const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(take(1))

然后我可以在可观察的内部进行过滤和批处理。。

pendingRequests.subscribe(nextRequest => {
let requestToSend = nextRequest.filter(x => x.processedState === ProcessedState.unprocessed);
const totalPendingCount = requestToSend.length;
requestToSend = requestToSend slice(0, this.batchSize);          
for (const nextRequest of requestToSend) {                  
this.sendRequest(nextRequest);            
}
if (totalPendingCount > this.batchSize) {
this.store$.dispatch(myActions.continuePolling());            
}

到目前为止,在我的测试中,我总是能发射complete

此外,通过两个操作(一个startPolling和一个continuePolling(,我可以将延迟放在continuePolling中,所以当我们第一次开始轮询时(例如,应用程序在超出网络范围后刚刚恢复在线(,我们会立即提交,只有当我们的超过批量时才会延迟

也许这不是100%的";rxy";做这件事的方式,但到目前为止似乎有效。这里有什么问题吗?

我会用toArray替换take,然后再替换一些缓冲逻辑。

这就是代码的样子。我添加了delay逻辑,我认为这是您上一篇文章所建议的,并提供了评论来描述添加的每一行

// implementation of the chunk function used below
// https://www.w3resource.com/javascript-exercises/fundamental/javascript-fundamental-exercise-265.php
const chunk = (arr, size) =>
Array.from({ length: Math.ceil(arr.length / size) }, (v, i) =>
arr.slice(i * size, i * size + size)
);
const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
// FlatMap turns the observable of a single Requests[] to observable of Requests
flatMap(x => x),
// Only get requests unprocessed
filter(x => x.processedState === ProcessedState.unprocessed),
// Read all the requests and store them in an array
toArray(),
// Split the array in chunks of the specified size, in this case 3
map(arr => chunk(arr, 3)), // the implementation of chunk is provided above
// Create a stream of chunks
concatMap((chunks) => from(chunks)),
// make sure each chunk is emitted after a certain delay, e.g. 2 sec
concatMap((chunk) => of(chunk).pipe(delay(2000))),
// mergeMap to turn an array into a stream
mergeMap((val) => val)
);
let requestsSent = false;
pendingRequests.subscribe(nextRequest => {           
requestsSent = true;
this.sendRequest(nextEvent);           
},
error => {            
this.logger.error(`${this.moduleName}.sendRequest:  Error ${error}`);
},
() => {         
// ****  THIS NOW SHOULD BE CALLED ****
if (requestsSent ) {              
this.store$.dispatch(myActions.continuePolling());              
} else {
this.store$.dispatch(myActions.stopPolling());              
}
}
);

我怀疑pendingRequests是否会自己完成。至少在ngrx中,StoreBehaviorSubject。因此,无论何时执行store.select()store.pipe(select()),都只是将另一个订阅者添加到由BehaviorSubject维护的内部订阅者列表中。

BehaviorSubject扩展了Subject,以下是订阅Subject时发生的情况:

this.observers.push(subscriber);

在您的情况下,您使用的是take(3)。在3个值之后,take将发出一个完整的通知,因此应该调用complete回调。由于整个链实际上是BehaviorSubject的订阅者,它将在complete通知中将自己从订阅者列表中删除。

我假设如果调用完整版,我不需要取消订阅

以下是当订阅者(例如TakeSubscriber(完成时会发生的情况:

protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}

因此,如果已经发生complete/error通知,则无需取消订阅。

最新更新