Я сломаю свой ответ на две части. Это не будет зависеть от RxJava, но только от обычной Java.
Во-первых, ждать A_OPERATION
Vertx vertx = Vertx.vertx();
CountDownLatch latch = new CountDownLatch(1);
Long start = System.currentTimeMillis();
vertx.deployVerticle(new AbstractVerticle() {
@Override
public void start() throws InterruptedException {
// Just to demonstrate
Thread.sleep(1000);
latch.countDown();
}
});
// Always use await with timeout
latch.await(2, TimeUnit.SECONDS);
System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
Теперь к более сложному примеру:
public static void main(String[] args) throws InterruptedException {
Vertx vertx = Vertx.vertx();
// This should be equal to number of operations to complete
CountDownLatch latch = new CountDownLatch(3);
Long start = System.currentTimeMillis();
// Start your operations
vertx.deployVerticle(new BlockingVerticle(latch));
vertx.deployVerticle(new BlockingVerticle(latch));
vertx.deployVerticle(new BlockingVerticle(latch));
// Always use await with timeout
latch.await(2, TimeUnit.SECONDS);
System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis");
}
private static class BlockingVerticle extends AbstractVerticle {
private final CountDownLatch latch;
public BlockingVerticle(CountDownLatch latch) {
this.latch = latch;
}
@Override
public void start() throws InterruptedException {
long millis = 1000 + ThreadLocalRandom.current().nextInt(500);
System.out.println("It will take me " + millis + " to complete");
// Wait for some random time, but no longer that 1.5 seconds
Thread.sleep(millis);
latch.countDown();
}
}
Следует отметить, что основной поток будет заблокирован макс (B_OPERATION, C_OPERATION, D_OPERATION) + несколько млн. больше.
спасибо за ответ! Но мне кажется странным развернуть дополнительные вертикали для синхронизации. Я также не хочу их разворачивать, поскольку мое приложение работает с параметрами «-ha», что означает небольшие накладные расходы для каждой вершины. В качестве решения (которое также мне не нравится) я могу использовать pass 'CountDownLatch' внутри' vertx.executeBlocking'. Но vertx docs говорит, что vertx должен освободить ваш код от кода синхронизации, и это кажется неправильным. –
Прежде всего, вы абсолютно правы. Я думал, что у вас есть какая-то конкретная причина, по которой вам нужен код блокировки. Я добавлю еще один ответ с тем же кодом на основе Futures. –