终止并从活动总线获得结果



,假设我有两个发现了特殊文件名的垂直(例如,可能是任何东西(,然后将它们发布到 event Bus 例如。一个是从REST API读取名称,另一个是文件系统中的:

scanrestverticle.java

/**
 * Reads file names through an REST API
 */
public class ScanRestVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        HttpClientRequest req = vertx.createHttpClient().request(HttpMethod.GET, "BASE_URL", "URL");
        req.toObservable()
           .flatMap(HttpClientResponse::toObservable)
           .lift(unmarshaller(Model.class))
           .subscribe(c -> vertx.eventBus().publish("address", c.specialName()));
        req.exceptionHandler(Throwable::printStackTrace);
        req.end();
    }
}

scanfsverticle.java

/**
 * Reads file names from a file
 */
public class ScanFsVerticle.java extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        StringObservable.byLine(vertx.fileSystem()
            .rxReadFile("myFileNames.txt")
            .map(Buffer::toString)
            .toObservable())
            .subscribe(c -> vertx.eventBus().publish("address", c), e -> System.err.println(e.getMessage()));
    }
}

一切都很好,但是,现在,我有一个垂直的垂直,将这些名称组合在事件总线中,然后在std.out上打印它们:

printverticle.java

public class PrintVerticle extends AbstractVerticle {
    @Override
    public void start() throws Exception {
        vertx.eventBus()
            .<JsonObject>consumer("address")
            .bodyStream()
            .toObservable()
            .reduce(new JsonArray(), JsonArray::add)
            .subscribe(j -> System.out.println(j.toString()));
}

问题是 REDAD 在这里从未真正完成,因为事件总线正如我所想的那样进行无限流。

那么,我如何实际完成此操作并打印两个垂体发布的名称?

注意:我真的在vert.x和rx中是新手

预先感谢。

编辑:我可以在此处调用scan(),而不是reduce(),以获取中间结果,但是如何获得scan().last()

您假设正确地假设reduce()操作员依靠onComplete()事件知道在哪里停止收集。
虽然scan()是累加器,它会为每个新发射流的累积值,因此扫描将为您提供中间结果。
这里的问题是,您使用的是EventBus,从概念上讲,EventBus是无限的流,因此onComplete()绝不应被调用。从技术上讲,当您close()``eventbus''时可能会称呼它,但我想您不应该也不想依靠它。

  • 如果您想使用EventBus,则应以某种方式结束PrintVerticle流,因此您将拥有onComplete()事件。例如,您可以使用EventBus来发布一些事件,这些事件发布了两个观察到的顶点结束的两个事件。然后,您可以将其拉动并在这两个事件发出之后在PrintVerticle上结束流(使用takeUntil()(。
    这样的东西:

    Observable vertice1EndStream = vertx.eventBus()
        .<JsonObject>consumer("vertice1_end")
        .bodyStream()
        .toObservable();
    Observable vertice2EndStream = vertx.eventBus()
        .<JsonObject>consumer("vertice2_end")
        .bodyStream()
        .toObservable()
    Observable bothVerticesEndStream = Observable.zip(vertice1EndStream, vertice2EndStream, (vertice1, vertice1) -> new Object());
    vertx.eventBus()
        .<JsonObject>consumer("address")
        .bodyStream()
        .toObservable()
        .takeUntil(bothVerticesEndStream)
        .reduce(new JsonArray(), JsonArray::add)
        .subscribe(j -> System.out.println(j.toString()));
    
  • 另一个选项是直接使用2个流而不是EventBus。将它们合并在一起,然后使用reduce(),因为那两个流是有限的,您不会有问题。

最新更新