我想使用Subject
具有并发= 1的异步任务队列。
收集新任务比执行这些任务更快,因此,作为性能优化,我希望处理所有未处理的任务。
下面的代码一个任务一个任务地检查:
this.tasks
.pipe(
mergeMap(async task => {
await this.processTasks([task])
}, 1),
)
我想把上面的代码转换成类似的东西:
this.tasks
.pipe(
mergeMap___AllUnprocessedUntilNow(async tasks => {
await this.processTasks(tasks)
}, 1),
)
- 我不能使用
bufferTime
或bufferCount
,因为它们会给每个新任务引入额外的延迟。
实际运行结果:
* adding task 1
* procssing task 1 (takes alot of time to perform it)
* meanwhile, adding more tasks: 2,3,4
* processed task 1
* procssing task 2
* processed task 2
* procssing task 3
* processed task 3
* procssing task 4
* processed task 4
预期运行结果:
* adding task 1
* procssing task 1 (takes alot of time to perform it)
* meanwhile, adding more tasks: 2,3,4
* processed task 1
* procssing task 2,3,4
* processed task 2,3,4
我有一个自定义操作符。需要注意的是,有一些细节可能会让事情变得比你想象的要复杂。最重要的是,这有点过度设计,但听起来你可以使用默认值来得到你想要的。
自定义RxJS操作符:bufferedexhastmap
function bufferedExhaustMap<T, R>(
project: (v: T[]) => ObservableInput<R>,
minBufferLength = 0,
minBufferCount = 1,
concurrent = 1
): OperatorFunction<T, R> {
function diffCounter(){
const incOrDec = new Subject<boolean>();
return {
onAvailable: incOrDec.pipe(
scan((acc, curr) => (curr ? ++acc : --acc), 0),
startWith(0),
shareReplay(1),
first(count => count < concurrent),
mapTo(true)
),
start: () => incOrDec.next(true),
end: () => incOrDec.next(false)
};
}
return source => defer(() => {
const projectCount = diffCounter();
const shared = source.pipe(share());
const nextBufferTime = () => forkJoin([
shared.pipe(take(minBufferCount), delay(0)),
timer(minBufferLength),
projectCount.onAvailable
]);
return shared.pipe(
bufferWhen(nextBufferTime),
delayWhen(() => projectCount.onAvailable),
tap(projectCount.start),
map(project),
mergeMap(projected => from(projected).pipe(
finalize(projectCount.end))
)
);
});
}
使用注意,我已经删除了一些多余的承诺/RxJS混合,因为它是一个代码气味。
this.tasks.pipe(
mergeMap___AllUnprocessedUntilNow(async tasks => {
await this.processTasks(tasks)
}, 1),
)
是
this.tasks.pipe(
mergeMap___AllUnprocessedUntilNow(
tasks => this.processTasks(tasks)
, 1
)
)
(请记住,所有高阶RxJS操作符都会将可迭代对象和承诺转换为可观察对象。否则,您可以使用from
操作符代替)。
由于bufferedExhaustMap
的默认并发数为1,因此可以使用new操作符编写如下:
this.tasks.pipe(
bufferedExhaustMap(
tasks => this.processTasks(tasks)
)
)
您可以使用buffer
和concatMap
的组合,以及filter
和tap
的一点帮助来完成此操作。
- 接收到第一次排放时
- 工作完成后
const release$ = new Subject<void>();
let releaseOnNextEmit = true;
const work$ = item$.pipe(
tap(() => {
if(releaseOnNextEmit) setTimeout(() => release$.next(), 0);
}),
buffer(release$),
tap(tasks => releaseOnNextEmit = tasks.length === 0),
filter(tasks => tasks.length > 0),
concatMap(tasks => processTasks(tasks)),
tap(() => release$.next())
);
你可以看到我们使用Subject来触发缓冲区的释放,并使用releaseOnNextEmit
标志来指示是否接收到一个发射应该触发缓冲区的释放。
缓冲区发出后,相应地设置标志:
true
false
非空时
filter
用于防止将空数组传递给concatMap
(当触发器发出时,如果缓冲区为空,则buffer将发出空数组)。
我们使用concatMap
来执行实际的工作,然后简单地释放缓冲区。
这是一个工作的StackBlitz演示。
所有的tap
都在那里,这不是很容易遵循,所以把它放在一个自定义操作符中可能是有益的:
function bufferConcatMap<T, R>(project: (a: T[]) => ObservableInput<R>) {
const release$ = new Subject<void>();
let releaseOnNextEmit = true;
return (source$: Observable<T>) => source$.pipe(
tap(() => {
if(releaseOnNextEmit) setTimeout(() => release$.next(), 0);
}),
buffer(release$),
tap(tasks => releaseOnNextEmit = tasks.length === 0),
filter(tasks => tasks.length > 0),
concatMap(project),
tap(() => release$.next())
);
}
const work$ = item$.pipe(
bufferConcatMap(tasks => processTasks(tasks))
);
这是另一个StackBlitz。