,假设我有两个发现了特殊文件名的垂直(例如,可能是任何东西(,然后将它们发布到 event Bus 例如。一个是从REST API读取名称,另一个是文件系统中的:
scanrestverticle.java
/**
* Reads file names through an REST API
*/
public class ScanRestVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL");
req.toObservable()
.flatMap(HttpClientResponse::toObservable)
.lift(unmarshaller(Model.class))
.subscribe(c -> vertx.eventBus().publish("address", c.specialName()));
req.exceptionHandler(Throwable::printStackTrace);
req.end();
}
}
scanfsverticle.java
/**
* Reads file names from a file
*/
public class ScanFsVerticle.java extends AbstractVerticle {
@Override
public void start() throws Exception {
StringObservable.byLine(vertx.fileSystem()
.rxReadFile("myFileNames.txt")
.map(Buffer::toString)
.toObservable())
.subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage()));
}
}
一切都很好,但是,现在,我有一个垂直的垂直,将这些名称组合在事件总线中,然后在std.out上打印它们:
printverticle.java
public class PrintVerticle extends AbstractVerticle {
@Override
public void start() throws Exception {
vertx.eventBus()
.<JsonObject>consumer("address")
.bodyStream()
.toObservable()
.reduce(new JsonArray(), JsonArray::add)
.subscribe(j -> System.out.println(j.toString()));
}
问题是 REDAD 在这里从未真正完成,因为事件总线正如我所想的那样进行无限流。
那么,我如何实际完成此操作并打印两个垂体发布的名称?
注意:我真的在vert.x和rx中是新手
预先感谢。
编辑:我可以在此处调用scan()
,而不是reduce()
,以获取中间结果,但是如何获得scan().last()
?
您假设正确地假设reduce()
操作员依靠onComplete()
事件知道在哪里停止收集。
虽然scan()
是累加器,它会为每个新发射流的累积值,因此扫描将为您提供中间结果。
这里的问题是,您使用的是EventBus
,从概念上讲,EventBus
是无限的流,因此onComplete()
绝不应被调用。从技术上讲,当您close()
``eventbus''时可能会称呼它,但我想您不应该也不想依靠它。
-
如果您想使用EventBus,则应以某种方式结束
PrintVerticle
流,因此您将拥有onComplete()
事件。例如,您可以使用EventBus
来发布一些事件,这些事件发布了两个观察到的顶点结束的两个事件。然后,您可以将其拉动并在这两个事件发出之后在PrintVerticle
上结束流(使用takeUntil()
(。
这样的东西:Observable vertice1EndStream = vertx.eventBus() .<JsonObject>consumer("vertice1_end") .bodyStream() .toObservable(); Observable vertice2EndStream = vertx.eventBus() .<JsonObject>consumer("vertice2_end") .bodyStream() .toObservable() Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object()); vertx.eventBus() .<JsonObject>consumer("address") .bodyStream() .toObservable() .takeUntil(bothVerticesEndStream) .reduce(new JsonArray(), JsonArray::add) .subscribe(j -> System.out.println(j.toString()));
-
另一个选项是直接使用2个流而不是
EventBus
。将它们合并在一起,然后使用reduce()
,因为那两个流是有限的,您不会有问题。