老式的回调同步使用可观测值



我有以下场景-4个回调函数A、B、C、D,它们被称为旧式库(内部使用一些API请求,因此执行时间未知/随机,但结果的正确顺序(按完成任务时间(对我来说很重要(-我希望使用rxjs将它们返回的数据同步到一个可观察的结果字符串。

function getData() {
// --- BELOW Part can be EDIT ---
let obs = new ReplaySubject(1); // this is example you can use an type
function A(n) { 
let r= 'A'.repeat(n);
}
function B(n) {
let r= 'B'.repeat(n);
}
function C(n) {
let r= 'C'.repeat(n);
}
function D(n) {
let r= 'D'.repeat(n);
obs.next(r);
}

// --- BELOW Part can NOT be edit ---
runLib(A,B,C,D)   
return obs
}

在下面的代码段中,finalResult的值是DDDDD,这是错误的。finalResult字符串的正确值应该是AADDDDDCCCCBBB

// SET-UP - NOT EDIT Below code
const { of, Observable, ReplaySubject } = rxjs;
const { map, switchMap, delay } = rxjs.operators; // example
// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }
function runLib(cA,cB,cC,cD) {
libA( cA ); libB( cB ); libC( cC ); libD( cD );
}
getData().subscribe(finalResult => {
console.log(finalResult) // The result is WRONG here!
}, e=>{}, _=> console.log('finished - unsubscribed'));

function getData() {
// --- BELOW Part can be EDIT ---
let obs = new ReplaySubject(1); // this is example, you can use diffrend observale kind
function A(n) { 
let r= 'A'.repeat(n);
}
function B(n) {
let r= 'B'.repeat(n);
}
function C(n) {
let r= 'C'.repeat(n);
}
function D(n) {
let r= 'D'.repeat(n);
obs.next(r);
}

// --- BELOW Part can NOT be edit ---
runLib(A,B,C,D)   
return obs
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

在代码片段中,我在getData()中标记了代码,可以在解决方案中编辑(可能看起来有点尴尬,但这正是我需要的((你也可以在那里找到finalResult,但不能编辑代码的那部分(。有可能吗?怎么做?

在这种情况下,最好的做法是包装库函数以返回Observable,然后使用forkJoin等待所有结果。

我拿走了你的代码并对其进行了修改以获得所需的结果,你需要:

  1. 在每个回调中都会将结果发送给一个主题
  2. 返回等待n发射的Observable-在本例中为4
  3. 将排放映射到单个字符串中

最终的getData函数如下所示:

function getData() {
// --- BELOW Part can be EDIT ---
const result$: Subject<string> = new Subject<string>();
const obs = result$.asObservable().pipe(
bufferCount(4), // or any desired number of callback
map((results: string[]) => results.join(''))
);
function A(n) {
let r = "A".repeat(n);
result$.next(r);
}
function B(n) {
let r = "B".repeat(n);
result$.next(r);
}
function C(n) {
let r = "C".repeat(n);
result$.next(r);
}
function D(n) {
let r = "D".repeat(n);
result$.next(r);
}
// --- BELOW Part can NOT be edit ---
runLib(A, B, C, D);
return obs;
}

你可以在这个stackblitz中找到完整的代码,或者在代码段下面运行

// SET-UP - NOT EDIT Below code
const { Subject } = rxjs;
const { take, bufferCount, map } = rxjs.operators; // example
// simulated lib functions
function libA(callback) { setTimeout( _=>callback(2), 1000); } 
function libB(callback) { setTimeout( _=>callback(3), 3000); }
function libC(callback) { setTimeout( _=>callback(4), 2000); }
function libD(callback) { setTimeout( _=>callback(5), 1500); }
function runLib(cA,cB,cC,cD) {
libA( cA ); libB( cB ); libC( cC ); libD( cD );
}
getData().subscribe(finalResult => {
console.log(finalResult) // The result is WRONG here!
}, e=>{},_=> console.log('finished - unsubscribed'));

function getData() {
// --- BELOW Part can be EDIT ---
const result$ = new Subject();
function A(n) {
let r = "A".repeat(n);
result$.next(r);
}
function B(n) {
let r = "B".repeat(n);
result$.next(r);
}
function C(n) {
let r = "C".repeat(n);
result$.next(r);
}
function D(n) {
let r = "D".repeat(n);
result$.next(r);
}
const obs = result$.pipe(
bufferCount(4), // or any desired number of callback
take(1),
map(results=> results.join``)
);
// --- BELOW Part can NOT be edit ---
runLib(A, B, C, D);
return obs;
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

以下内容如何:(我喜欢@Tal Ohana's的答案,但他的解决方案中的主题永远不会打赌取消订阅,这可能会导致内存不足(

function getData() {
let obs = new Subject<string>();
function A(n: number) {
let r = 'A'.repeat(n);
obs.next(r);
}
function B(n: number) {
let r = 'B'.repeat(n);
obs.next(r);
}
function C(n: number) {
let r = 'C'.repeat(n);
obs.next(r);
}
function D(n: number) {
let r = 'D'.repeat(n);
obs.next(r);
}
runLib(A, B, C, D)
return obs.pipe(
scan((acc, value) => acc + value),
take(4),
last()
)
}