创建动态长度的可观察循环



我有一个函数,它根据字符串数组输入给我下一个单词,并追加数组:

getNextWordAndAppend(stringArray: string[]) {
// parse
return this.title.getNextWord(word).pipe(
map((result: {}[]) => {
const nextWord = // next word from objectArray
stringArray.push(nextWord ? nextWord.toString() : "")
return stringArray.filter(item => // some filtering logic)
})
)
}

假设我想链接这个函数,并将前一个可观察对象的值传递给下一个,以创建一个句子:

initialize() {
this.formControl.valueChanges.pipe(
// parse input
concatMap((stringArray: string[]) => this.getNextWordAndAppend(stringArray)),
concatMap((stringArray: string[]) => this.getNextWordAndAppend(stringArray)),
concatMap((stringArray: string[]) => this.getNextWordAndAppend(stringArray)),
// parse output
).subscribe()
}

假设我想动态地确定这个最终的可观察对象的迭代,我如何创建一个函数像下面?它接受一个Observable作为输入,也应该返回一个。

伪代码:

//
.pipe(
// Initial value, Observable to loop, and number of iterations 
concatAndPassResult(["Hello", "world"], this.getNextWordAndAppend, 50)
)

concatAndPassResult会是什么样子?关于reduce的事?

您可以使用expand递归运行async observable并附加结果。take可以在指定的执行次数下完成流。

因为初始值来自一个可观察的源,所以我改变了你的函数签名。

getWords这里是模拟一个async调用返回随机字

import { from, of, timer } from "rxjs";
import { map, takeWhile, take, tap, expand } from "rxjs/operators";
const getWords = timer(1000).pipe(map(() => "someword" + Math.random()));
const concatAndPassResult = (stream, count) => (source) => {
return source.pipe(
expand((value) => stream.pipe(map((v) => [...value, v]))),
take(count)
);
};
of(["Hello", "world"])
.pipe(concatAndPassResult(getWords, 40))
.subscribe(console.log);

最新更新