在我的上一篇文章中,我试图使用RxJS缓冲挂起的http请求。我原以为bufferCount
是我需要的,但我发现我的项目低于缓冲区大小,它只是等待,这不是我想要的。
我现在有了一个新的方案,使用take
。它似乎在做我想要做的事情,除了当我得到的可观察对象没有项目(剩下(时,完整的永远不会被调用。
我有如下的东西。。
const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
// FlatMap turns the observable of a single Requests[] to observable of Requests
flatMap(x => x),
// Only get requests unprocessed
filter(x => x.processedState === ProcessedState.unprocessed),
// Batches of batchSize in each emit
take(3),
);
let requestsSent = false;
pendingRequests.subscribe(nextRequest => {
requestsSent = true;
this.sendRequest(nextEvent);
},
error => {
this.logger.error(`${this.moduleName}.sendRequest: Error ${error}`);
},
() => {
// **** This is not called if pendingRequests is empty ****
if (requestsSent ) {
this.store$.dispatch(myActions.continuePolling());
} else {
this.store$.dispatch(myActions.stopPolling());
}
}
);
因此,take(3)
将获得接下来的3个挂起的请求并发送它们((,在那里我还调度了一个操作,将已处理的状态设置为非ProcessedState.pending,这样我们就不会在下一次轮询中获得它们(
这一切都很好,但当pendingRequests
最终什么都不返回(为空(时,用***标记的completed
块。未调用。我本以为这会被立即调用。
我不确定这是否重要,因为由于我当时没有采取行动继续投票,投票确实停止了。
但我最担心的是,如果pendingRequests
没有完成,我是否需要取消订阅以防止任何泄露?我假设如果呼叫了complete
,我不需要取消订阅?
更新
为了使pendingReguests
始终完整,我采取了一种稍微不同的方法。而不是使用CCD_ 9运算符来";滤波器";,我每次都会得到整个列表,上面只有take(1)
。我总是会得到列表,即使它是空的,所以pendingReguests
每次都会完成。
即
const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(take(1))
然后我可以在可观察的内部进行过滤和批处理。。
pendingRequests.subscribe(nextRequest => {
let requestToSend = nextRequest.filter(x => x.processedState === ProcessedState.unprocessed);
const totalPendingCount = requestToSend.length;
requestToSend = requestToSend slice(0, this.batchSize);
for (const nextRequest of requestToSend) {
this.sendRequest(nextRequest);
}
if (totalPendingCount > this.batchSize) {
this.store$.dispatch(myActions.continuePolling());
}
到目前为止,在我的测试中,我总是能发射complete
。
此外,通过两个操作(一个startPolling和一个continuePolling(,我可以将延迟放在continuePolling中,所以当我们第一次开始轮询时(例如,应用程序在超出网络范围后刚刚恢复在线(,我们会立即提交,只有当我们的超过批量时才会延迟
也许这不是100%的";rxy";做这件事的方式,但到目前为止似乎有效。这里有什么问题吗?
我会用toArray
替换take
,然后再替换一些缓冲逻辑。
这就是代码的样子。我添加了delay
逻辑,我认为这是您上一篇文章所建议的,并提供了评论来描述添加的每一行
// implementation of the chunk function used below
// https://www.w3resource.com/javascript-exercises/fundamental/javascript-fundamental-exercise-265.php
const chunk = (arr, size) =>
Array.from({ length: Math.ceil(arr.length / size) }, (v, i) =>
arr.slice(i * size, i * size + size)
);
const pendingRequests = this.store$.select(mySelects.getPendingRequests).pipe(
// FlatMap turns the observable of a single Requests[] to observable of Requests
flatMap(x => x),
// Only get requests unprocessed
filter(x => x.processedState === ProcessedState.unprocessed),
// Read all the requests and store them in an array
toArray(),
// Split the array in chunks of the specified size, in this case 3
map(arr => chunk(arr, 3)), // the implementation of chunk is provided above
// Create a stream of chunks
concatMap((chunks) => from(chunks)),
// make sure each chunk is emitted after a certain delay, e.g. 2 sec
concatMap((chunk) => of(chunk).pipe(delay(2000))),
// mergeMap to turn an array into a stream
mergeMap((val) => val)
);
let requestsSent = false;
pendingRequests.subscribe(nextRequest => {
requestsSent = true;
this.sendRequest(nextEvent);
},
error => {
this.logger.error(`${this.moduleName}.sendRequest: Error ${error}`);
},
() => {
// **** THIS NOW SHOULD BE CALLED ****
if (requestsSent ) {
this.store$.dispatch(myActions.continuePolling());
} else {
this.store$.dispatch(myActions.stopPolling());
}
}
);
我怀疑pendingRequests
是否会自己完成。至少在ngrx中,Store
是BehaviorSubject
。因此,无论何时执行store.select()
或store.pipe(select())
,都只是将另一个订阅者添加到由BehaviorSubject
维护的内部订阅者列表中。
BehaviorSubject
扩展了Subject
,以下是订阅Subject
时发生的情况:
this.observers.push(subscriber);
在您的情况下,您使用的是take(3)
。在3个值之后,take
将发出一个完整的通知,因此应该调用complete
回调。由于整个链实际上是BehaviorSubject
的订阅者,它将在complete
通知中将自己从订阅者列表中删除。
我假设如果调用完整版,我不需要取消订阅
以下是当订阅者(例如TakeSubscriber
(完成时会发生的情况:
protected _complete(): void {
this.destination.complete();
this.unsubscribe();
}
因此,如果已经发生complete
/error
通知,则无需取消订阅。