2016-10-29 7 views
0

У меня есть четыре операции ввода-вывода: A, B, C и D. Каждый из них должен быть выполнен с vertx.executeBlocking. Я должен иметь следующее поведение:Vert.x сделать обещание от vertx.executeBlocking

//PSEUDOCODE 
waitForExecuteBlocking(A_OPERATION); 
thenWaitForAllExecuteBlocking(`B_OPERATION`, `C_OPERATION`, `D_OPERATION`) 
/* do something */ 

Как можно добиться такого поведения?

Я не могу найти solutioin в Vertx Rx. Есть причины, по которым я не хочу обматывать классы *_OPERATION как рабочие вертикали.

ответ

1

Я добавляю еще один ответ, на этот раз с Futures.
Прежде всего, обратите внимание, что это фьючерсы Vertx, а не обычные фьючерсы Java. Используйте правильный импорт.
Теперь код:

// I'm running in main(), so everything is static, just for the sake of example 
private static Vertx vertx = Vertx.vertx(); 
public static void main(String[] args) throws InterruptedException { 


    long start = System.currentTimeMillis(); 

    // In your case it should be operationA(), operationB(), etc 
    // But I wanted to make the code shorter 
    CompositeFuture.all(operationA(), operationA(), operationA()).setHandler((r) -> { 
     if (r.succeeded()) { 
      // You can even iterate all the results 
      List<String> results = r.result().list(); 
      for (String result : results) { 
       System.out.println(result); 
      } 
      // This will still print max(operationA, operationB, operationC) 
      System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis"); 
     } 
     else { 
      System.out.println("Something went wrong"); 
     } 
    }); 
} 


// Return a future, then fulfill it after some time 
private static Future<String> operationA() { 
    Future<String> future = Future.future(); 

    long millis = 1000 + ThreadLocalRandom.current().nextInt(500); 
    vertx.setTimer(millis, (l) -> { 
     future.complete("All is good " + millis); 
    }); 

    return future; 
} 
1

Я сломаю свой ответ на две части. Это не будет зависеть от RxJava, но только от обычной Java.
Во-первых, ждать A_OPERATION

Vertx vertx = Vertx.vertx(); 

    CountDownLatch latch = new CountDownLatch(1); 

    Long start = System.currentTimeMillis(); 

    vertx.deployVerticle(new AbstractVerticle() { 

     @Override 
     public void start() throws InterruptedException { 
      // Just to demonstrate 
      Thread.sleep(1000); 

      latch.countDown(); 
     } 
    }); 

    // Always use await with timeout 
    latch.await(2, TimeUnit.SECONDS); 

    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis"); 

Теперь к более сложному примеру:

public static void main(String[] args) throws InterruptedException { 

    Vertx vertx = Vertx.vertx(); 

    // This should be equal to number of operations to complete 
    CountDownLatch latch = new CountDownLatch(3); 

    Long start = System.currentTimeMillis(); 

    // Start your operations 
    vertx.deployVerticle(new BlockingVerticle(latch)); 
    vertx.deployVerticle(new BlockingVerticle(latch)); 
    vertx.deployVerticle(new BlockingVerticle(latch)); 

    // Always use await with timeout 
    latch.await(2, TimeUnit.SECONDS); 

    System.out.println("Took me " + (System.currentTimeMillis() - start) + " millis"); 
} 


private static class BlockingVerticle extends AbstractVerticle { 

    private final CountDownLatch latch; 

    public BlockingVerticle(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    @Override 
    public void start() throws InterruptedException { 


     long millis = 1000 + ThreadLocalRandom.current().nextInt(500); 

     System.out.println("It will take me " + millis + " to complete"); 

     // Wait for some random time, but no longer that 1.5 seconds 
     Thread.sleep(millis); 

     latch.countDown(); 
    } 

} 

Следует отметить, что основной поток будет заблокирован макс (B_OPERATION, C_OPERATION, D_OPERATION) + несколько млн. больше.

+0

спасибо за ответ! Но мне кажется странным развернуть дополнительные вертикали для синхронизации. Я также не хочу их разворачивать, поскольку мое приложение работает с параметрами «-ha», что означает небольшие накладные расходы для каждой вершины. В качестве решения (которое также мне не нравится) я могу использовать pass 'CountDownLatch' внутри' vertx.executeBlocking'. Но vertx docs говорит, что vertx должен освободить ваш код от кода синхронизации, и это кажется неправильным. –

+0

Прежде всего, вы абсолютно правы. Я думал, что у вас есть какая-то конкретная причина, по которой вам нужен код блокировки. Я добавлю еще один ответ с тем же кодом на основе Futures. –