以异步方式处理管道中的数据



我有一个process函数,它以异步方式工作——它接收输入文本的行并对其进行处理(在下面的代码段中——为了简化——它从行中提取数字(。这些行来自输入的大文件(许多GB(,并被逐个读取并发送到管道(这是由下面的of运算符"模拟"的(。CCD_ 3的功能很慢,但读线很快,所以关键点是一行一行地读和处理线,以避免存储";堆栈溢出";。

const { of, Subject } = rxjs;
const { take, bufferCount, map, finalize } = rxjs.operators;

let source = of("line 11 of text", "line 22 of text", "line 33 of text") // large file with lines
.pipe(map(line => process(line, ()=> { 
// ???? how te callback/pipe shoud look ????
}))
);
source.subscribe(x=> console.log(x)); 
// expected result shoud be: 
// 11
// 22
// 33

// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_=>{ 
let result = line.match(/d/)[0];
callback(result);
}, (1+Math.random()*9)*100); // random processing time
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

我不知道怎么做——知道吗?

您需要将回调转换为Promise,因为Promise可以使用rxjs流。

import { of } from "rxjs";
import { concatMap } from "rxjs/operators";
const promiseWrapper = line => new Promise(resolve => process(line, resolve));
let source = of("line 11 of text", "line 22 of text", "line 33 of text").pipe(
concatMap(item => promiseWrapper(item))
);
source.subscribe(x => console.log(x));
// expected result shoud be:
// 11
// 22
// 33
// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_ => {
let result = line.match(/d+/)[0];
callback(result);
}, 1000);
}

此外,我已经准备了stackblitz(或下面的片段(

const { of, Subject } = rxjs;
const { concatMap } = rxjs.operators;
const promiseWrapper = line => new Promise(resolve => process(line, resolve));
let source = of(
"line 11 of text", 
"line 22 of text", 
"line 33 of text") // large file with lines
.pipe(concatMap(item => promiseWrapper(item))
);
source.subscribe(x=> console.log(x)); 
// expected result shoud be: 
// 11
// 22
// 33

// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_=>{ 
let result = line.match(/d+/)[0];
callback(result);
}, (1+Math.random()*9)*100);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.5/rxjs.umd.min.js" integrity="sha256-85uCh8dPb35sH3WK435rtYUKALbcvEQFC65eg+raeuc=" crossorigin="anonymous"></script>

您可以使用bindCallback将类型为f(x, callback)的函数转换为函数g(x),该函数返回一个冷的可观察对象,该可观察对象发出传递给回调的结果。

import { bindCallback, of } from "rxjs";
import { concatMap } from "rxjs/operators";
const boundProcess = bindCallback(process);
let source = of("line 11 of text", "line 22 of text", "line 33 of text").pipe(
concatMap(line => boundProcess(line))
);
source.subscribe(x => console.log(x));
// expected result shoud be:
// 11
// 22
// 33
// This is third-party library - I cannot change it
function process(line, callback) {
setTimeout(_ => {
let result = line.match(/d+/)[0];
callback(result);
}, 1000);
}

https://stackblitz.com/edit/typescript-2rai4c?file=index.ts

最新更新