我有四个I/O操作:A
, B
, C
和D
。每一个都应该用vertx.executeBlocking
执行。我应该有下一个行为:
//PSEUDOCODE
waitForExecuteBlocking(A_OPERATION);
thenWaitForAllExecuteBlocking(`B_OPERATION`, `C_OPERATION`, `D_OPERATION`)
/* do something */
如何实现此行为?
我找不到在顶点Rx的解决方案。有原因,我不想包装我的*_OPERATION
类作为工人的顶点。
我要添加另一个答案,这次是期货。
首先请注意,这些是Vertx期货,而不是常规的Java期货。使用正确的导入。
现在进入代码:
// I'm running in main(), so everything is static, just for the sake of example
private static Vertx vertx = Vertx.vertx();
public static void main(String[] args) throws InterruptedException {
long start = System.currentTimeMillis();
// In your case it should be operationA(), operationB(), etc
// But I wanted to make the code shorter
CompositeFuture.all(operationA(), operationA(), operationA()).setHandler((r) -> {
if (r.succeeded()) {
// You can even iterate all the results
List<String> results = r.result().list();
for (String result : results) {
System.out.println(result);
}
// This will still print max(operationA, operationB, operationC)
System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
}
else {
System.out.println("Something went wrong");
}
});
}
// Return a future, then fulfill it after some time
private static Future<String> operationA() {
Future<String> future = Future.future();
long millis = 1000 + ThreadLocalRandom.current().nextInt(500);
vertx.setTimer(millis, (l) -> {
future.complete("All is good " + millis);
});
return future;
}
我将把我的答案分成两部分。它不依赖于RxJava,而只依赖于常规Java。
首先,等待A_OPERATION
Vertx vertx = Vertx.vertx();
CountDownLatch latch = new CountDownLatch(1);
Long start = System.currentTimeMillis();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws InterruptedException {
// Just to demonstrate
Thread.sleep(1000);
latch.countDown();
}
});
// Always use await with timeout
latch.await(2, TimeUnit.SECONDS);
System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
现在来看一个更复杂的例子:
public static void main(String[] args) throws InterruptedException {
Vertx vertx = Vertx.vertx();
// This should be equal to number of operations to complete
CountDownLatch latch = new CountDownLatch(3);
Long start = System.currentTimeMillis();
// Start your operations
vertx.deployVerticle(new BlockingVerticle(latch));
vertx.deployVerticle(new BlockingVerticle(latch));
vertx.deployVerticle(new BlockingVerticle(latch));
// Always use await with timeout
latch.await(2, TimeUnit.SECONDS);
System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
}
private static class BlockingVerticle extends AbstractVerticle {
private final CountDownLatch latch;
public BlockingVerticle(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void start() throws InterruptedException {
long millis = 1000 + ThreadLocalRandom.current().nextInt(500);
System.out.println("It will take me " + millis + " to complete");
// Wait for some random time, but no longer that 1.5 seconds
Thread.sleep(millis);
latch.countDown();
}
}
你应该注意主线程将被阻塞max(B_OPERATION, C_OPERATION, D_OPERATION) +几毫秒。
使用Composite Futures将有助于解决您的问题,有许多compositeFutures功能如compositeFuture。加入和综合期货。所有的期货列表都可以传递