在RxJS Observable的onNext中等待异步操作



我有一个RxJS序列正在以正常方式使用。。。

然而,在可观察的"onNext"处理程序中,一些操作将同步完成,但其他操作需要异步回调,在处理输入序列中的下一项之前需要等待。

有点困惑该怎么做。有什么想法吗?谢谢

someObservable.subscribe(
function onNext(item)
{
if (item == 'do-something-async-and-wait-for-completion')
{
setTimeout(
function()
{
console.log('okay, we can continue');
}
, 5000
);
}
else
{
// do something synchronously and keep on going immediately
console.log('ready to go!!!');
}
},
function onError(error)
{
console.log('error');
},
function onComplete()
{
console.log('complete');
}
);

您想要执行的每个操作都可以建模为可观察的。甚至同步操作也可以用这种方式建模。然后可以使用map将序列转换为序列序列,然后使用concatAll将序列压平。

someObservable
.map(function (item) {
if (item === "do-something-async") {
// create an Observable that will do the async action when it is subscribed
// return Rx.Observable.timer(5000);
// or maybe an ajax call?  Use `defer` so that the call does not
// start until concatAll() actually subscribes.
return Rx.Observable.defer(function () { return Rx.Observable.ajaxAsObservable(...); });
}
else {
// do something synchronous but model it as an async operation (using Observable.return)
// Use defer so that the sync operation is not carried out until
// concatAll() reaches this item.
return Rx.Observable.defer(function () {
return Rx.Observable.return(someSyncAction(item));
});
}
})
.concatAll() // consume each inner observable in sequence
.subscribe(function (result) {
}, function (error) {
console.log("error", error);
}, function () {
console.log("complete");
});

要回复您的一些评论。。。在某个时刻,您需要对函数流强制执行一些期望。在大多数语言中,当处理可能是异步的函数时,函数签名是异步的,并且函数的实际异步与同步特性被隐藏为函数的实现细节。无论您使用的是javaScript promise、Rx-observables、c#Tasks、c++Futures等,都是如此。函数最终返回promise/observable/task/future等,如果函数实际上是同步的,那么它返回的对象刚刚完成。

话虽如此,由于这是JavaScript,您可以作弊:

var makeObservable = function (func) {
return Rx.Observable.defer(function () {
// execute the function and then examine the returned value.
// if the returned value is *not* an Rx.Observable, then
// wrap it using Observable.return
var result = func();
return result instanceof Rx.Observable ? result: Rx.Observable.return(result);
});
}
someObservable
.map(makeObservable)
.concatAll()
.subscribe(function (result) {
}, function (error) {
console.log("error", error);
}, function () {
console.log("complete");
});

首先,将异步操作移出subscribe,它不是为异步操作而设计的。

您可以使用mergeMap(别名flatMap)或concatMap。(我提到了这两个,但concatMap实际上是mergeMapconcurrent参数设置为1。)设置不同的并发参数很有用,因为有时您希望限制并发查询的数量,但仍然同时运行几个。

source.concatMap(item => {
if (item == 'do-something-async-and-wait-for-completion') {
return Rx.Observable.timer(5000)
.mapTo(item)
.do(e => console.log('okay, we can continue'));
} else {
// do something synchronously and keep on going immediately
return Rx.Observable.of(item)
.do(e => console.log('ready to go!!!'));
}
}).subscribe();

我还将展示如何限制您的通话费率建议:仅在您实际需要时进行速率限制,例如调用每秒或分钟仅允许特定数量请求的外部API。否则,最好只限制并发操作的数量,让系统以最大速度移动。

我们从以下片段开始:

const concurrent;
const delay;
source.mergeMap(item =>
selector(item, delay)
, concurrent)

接下来,我们需要为concurrentdelay选择值,并实现selector。CCD_ 13和CCD_。例如,如果我们想每秒运行10个项目,我们可以使用concurrent = 10delay = 1000(毫秒),也可以使用concurrent = 5delay = 500concurrent = 4delay = 400。每秒的项目数将始终为concurrent / (delay / 1000)

现在让我们实现selector。我们有几个选择。我们可以为selector设置最短的执行时间,我们可以为其添加恒定的延迟,我们可以在结果可用时立即发出结果,我们只能在经过最短的延迟后发出结果等。甚至可以使用timeout运算符添加超时。方便

设置最短时间,提前发送结果:

function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.merge(Rx.Observable.timer(delay).ignoreElements())
}

设置最短时间,延迟发送结果:

function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.zip(Rx.Observable.timer(delay), (item, _))
}

添加时间,提前发送结果:

function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.concat(Rx.Observable.timer(delay).ignoreElements())
}

添加时间,延迟发送结果:

function selector(item, delay) {
return Rx.Observable.of(item)
.delay(1000) // replace this with your actual call.
.delay(delay)
}

另一个进行手动异步操作的简单示例。

请注意,这不是一个好的被动练习!如果您只想等待1000ms,请使用Rx.Observable.timer或延迟运算符。

someObservable.flatMap(response => {
return Rx.Observable.create(observer => {
setTimeout(() => {
observer.next('the returned value')
observer.complete()
}, 1000)
})
}).subscribe()

现在,将setTimeout替换为异步函数,如Image.onload或fileReader.onload…

最新更新