我试图在rxj上创建类似事件循环的东西,我使用firstValueFrom
作为门,等待所有事件在进一步处理之前。我们的目标是让一个nodejs服务运行并对各种事件做出反应,处理这些事件,并能够在命令时优雅地关闭。
我可以看到我无法向自己解释的行为-当退出条件可以满足时-一切都如预期的那样工作:事件由发行者发出并由处理程序处理。
然而,当我删除退出事件出现的可能性时-代码在rx.firstValueFrom
调用后立即退出。
代码:
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))
输出:
1
2
3
result: exit
finally
使用预期的无限循环修改了代码("exit"事件被注释掉):
import * as rx from "rxjs";
import * as op from "rxjs/operators";
async function foo(): Promise<string> {
console.log("1");
const s = new rx.ReplaySubject<string>();
const t = rx.timer(1000)
.pipe(
op.take(3),
op.map(x => x.toString()),
//op.endWith("exit"),
);
const exitObserver = s.asObservable()
.pipe(
op.mergeWith(t),
op.filter(x => x === "exit")
);
console.log("2");
const firstValue = await rx.firstValueFrom(exitObserver);
console.log("3");
return firstValue;
}
foo()
.then(x => console.log(`result: ${x}`))
.catch(e => console.error(e))
.finally(() => console.log('finally'))
输出:
1
2
这是意料之外的。我希望这段代码可以无限期地等待"退出事件"。没有错误消息。我使用typescript 4.3.5, node v14.15.4, RxJs 7.4.0。
我的问题是:
- 为什么修改后的代码不会进入无限循环等待不存在的消息?
- 如何创建一个无限循环与RxJs?
-
它不能把3写入输出,因为它还在等待exitObservable的第一个值。你有一个过滤器,所以它永远不会发生。在RxJS世界里,无限循环这个术语可能会误导人。
-
您可以使用
takeUntil
来实现您的目标。
const {Subject} = rxjs;
const {filter, takeUntil} = rxjs.operators;
const actions$ = new Subject();
actions$
.pipe(
filter(action => action !== 'exit'),
takeUntil(actions$.pipe(
filter(action => action === 'exit')
))
)
.subscribe({
next: action => console.log(`result: ${action}`),
error: error => console.error(e),
complete: () => console.log('exit'),
});
actions$.next('first');
actions$.next('second');
actions$.next('exit');
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
- 为什么修改后的代码不会进入无限循环等待不存在的消息?
。这就是为什么
console.log("3");
return firstValue;
永远不会运行。你从不记录结果,等等。
如何创建一个无限循环与RxJs?
不知道你是什么意思。下面的代码将永远每5秒打印一个数字。这就是你所说的indefinite loop
吗?
interval(5000).subscribe(console.log);