使用 zip 和 last(默认值)的发射器处理器永远不会发出值



我正在尝试将ziplast运算符一起使用,以组合三种不同的EmitterProcessor并处理组合结果。这在下面定义下完美地工作,除非其中一个处理器不发出项目的情况。我对last(defaultValue)运算符的理解是,它会在EmitterProcessor完成时发出默认值,而不会发出值。这反过来应该完成zip,并发送默认值以及来自其他处理器的任何实际值。问题是在所有处理器上执行complete调用后,zip 永远不会完成。我是否遗漏了某些内容,或者反应堆代码中的某处存在错误?

EmitterProcessor.zip(
emitter1.last("Missing Value 1"),
emitter2.last("Missing Value 2"),
emitter3.last("Missing Value 3")
).subscribe {
logger.error(it.t1)
logger.error(it.t2)
logger.error(it.t3)
}
// Later in the code I call complete on all emitters
emitter1.sink().complete()
emitter2.sink().complete()
emitter3.sink().complete()

也许 combineLatest 会解决你的问题。请注意,您至少应向每个处理器发出 1 个值

EmitterProcessor<String> emitter1= EmitterProcessor.create();
EmitterProcessor<String> emitter2= EmitterProcessor.create();
EmitterProcessor<String> emitter3= EmitterProcessor.create();
EmitterProcessor.zip(
emitter1.defaultIfEmpty("Missing Value 1"),
emitter2.defaultIfEmpty("Missing Value 2"),
emitter3.defaultIfEmpty("Missing Value 3")
).log();
EmitterProcessor.combineLatest(emitter1, emitter2, emitter3, objects -> {
return objects;
}).log().subscribe();
emitter1.onNext("a");
emitter2.onNext("a");
emitter3.onNext("a");
emitter1.onNext("a");
emitter1.onNext("a");
emitter1.onNext("a");
emitter1.onNext("a");
emitter1.onNext("a");
// Later in the code I call complete on all emitters
emitter1.sink().complete();
emitter2.sink().complete();
emitter3.sink().complete();

最新更新