rxjs firstValueFrom never resolve



我试图在rxj上创建类似事件循环的东西,我使用firstValueFrom作为门,等待所有事件在进一步处理之前。我们的目标是让一个nodejs服务运行并对各种事件做出反应,处理这些事件,并能够在命令时优雅地关闭。

我可以看到我无法向自己解释的行为-当退出条件可以满足时-一切都如预期的那样工作:事件由发行者发出并由处理程序处理。

然而,当我删除退出事件出现的可能性时-代码在rx.firstValueFrom调用后立即退出。

代码:

import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))

输出:

1
2
3
result: exit
finally

使用预期的无限循环修改了代码("exit"事件被注释掉):

import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
//op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))

输出:

1
2

这是意料之外的。我希望这段代码可以无限期地等待"退出事件"。没有错误消息。我使用typescript 4.3.5, node v14.15.4, RxJs 7.4.0。

我的问题是:

  1. 为什么修改后的代码不会进入无限循环等待不存在的消息?
  2. 如何创建一个无限循环与RxJs?
  1. 它不能把3写入输出,因为它还在等待exitObservable的第一个值。你有一个过滤器,所以它永远不会发生。在RxJS世界里,无限循环这个术语可能会误导人。

  2. 您可以使用takeUntil来实现您的目标。

const {Subject} = rxjs;
const {filter, takeUntil} = rxjs.operators;
const actions$ = new Subject();
actions$
.pipe(
filter(action => action !== 'exit'),
takeUntil(actions$.pipe(
filter(action => action === 'exit')
))
)
.subscribe({
next: action => console.log(`result: ${action}`),
error: error => console.error(e),
complete: () => console.log('exit'),
});

actions$.next('first');
actions$.next('second');
actions$.next('exit');
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

  1. 为什么修改后的代码不会进入无限循环等待不存在的消息?

。这就是为什么

console.log("3");
return firstValue;

永远不会运行。你从不记录结果,等等。


如何创建一个无限循环与RxJs?

不知道你是什么意思。下面的代码将永远每5秒打印一个数字。这就是你所说的indefinite loop吗?

interval(5000).subscribe(console.log);

相关内容

  • 没有找到相关文章

最新更新