绿色.x从vertex . executeblocking中做出承诺



我有四个I/O操作:A, B, CD。每一个都应该用vertx.executeBlocking执行。我应该有下一个行为:

//PSEUDOCODE
waitForExecuteBlocking(A_OPERATION);
thenWaitForAllExecuteBlocking(`B_OPERATION`, `C_OPERATION`, `D_OPERATION`)
/* do something */

如何实现此行为?

我找不到在顶点Rx的解决方案。有原因,我不想包装我的*_OPERATION类作为工人的顶点。

我要添加另一个答案,这次是期货。
首先请注意,这些是Vertx期货,而不是常规的Java期货。使用正确的导入。
现在进入代码:

// I'm running in main(), so everything is static, just for the sake of example
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) throws InterruptedException {

    long start = System.currentTimeMillis();
    // In your case it should be operationA(), operationB(), etc
    // But I wanted to make the code shorter
    CompositeFuture.all(operationA(), operationA(), operationA()).setHandler((r) -> {
        if (r.succeeded()) {
            // You can even iterate all the results
            List<String> results = r.result().list();
            for (String result : results) {
                System.out.println(result);
            }
            // This will still print max(operationA, operationB, operationC)
            System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
        }
        else {
            System.out.println("Something went wrong");
        }
    });
}

// Return a future, then fulfill it after some time
private static Future<String> operationA() {
    Future<String> future = Future.future();
    long millis = 1000 + ThreadLocalRandom.current().nextInt(500);
    vertx.setTimer(millis, (l) -> {
        future.complete("All is good " + millis);
    });
    return future;
}

我将把我的答案分成两部分。它不依赖于RxJava,而只依赖于常规Java。
首先,等待A_OPERATION

    Vertx vertx = Vertx.vertx();
    CountDownLatch latch = new CountDownLatch(1);
    Long start = System.currentTimeMillis();
    vertx.deployVerticle(new AbstractVerticle() {
        @Override
        public void start() throws InterruptedException {
            // Just to demonstrate
            Thread.sleep(1000);
            latch.countDown();
        }
    });
    // Always use await with timeout
    latch.await(2, TimeUnit.SECONDS);
    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");

现在来看一个更复杂的例子:

public static void main(String[] args) throws InterruptedException {
    Vertx vertx = Vertx.vertx();
    // This should be equal to number of operations to complete
    CountDownLatch latch = new CountDownLatch(3);
    Long start = System.currentTimeMillis();
    // Start your operations
    vertx.deployVerticle(new BlockingVerticle(latch));
    vertx.deployVerticle(new BlockingVerticle(latch));
    vertx.deployVerticle(new BlockingVerticle(latch));
    // Always use await with timeout
    latch.await(2, TimeUnit.SECONDS);
    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
}

private static class BlockingVerticle extends AbstractVerticle {
    private final CountDownLatch latch;
    public BlockingVerticle(CountDownLatch latch) {
        this.latch = latch;
    }
    @Override
    public void start() throws InterruptedException {

        long millis = 1000 + ThreadLocalRandom.current().nextInt(500);
        System.out.println("It will take me " + millis + " to complete");
        // Wait for some random time, but no longer that 1.5 seconds
        Thread.sleep(millis);
        latch.countDown();
    }
}

你应该注意主线程将被阻塞max(B_OPERATION, C_OPERATION, D_OPERATION) +几毫秒。

使用Composite Futures将有助于解决您的问题,有许多compositeFutures功能如compositeFuture。加入和综合期货。所有的期货列表都可以传递

相关内容

  • 没有找到相关文章

最新更新