2017-01-28 3 views
1

Проблема: рассмотрим сценарий, в котором есть несколько служб, которые могут выполнять определенную задачу. Каждый сервис может реагировать на задание с разным количеством времени. Нам нужно всегда выбирать ответ от самого быстрого обслуживания.Как извлечь результат из самого быстрого реагирующего потока из группы потоков?

ответ

-1

Ниже приведен пример рабочего кода с использованием RxJava Observables, который выводит результат из самой быстрой ответной нити из группы потоков.

public static void main(String[] args) { 
      // Create a slow thread which spans 5 secs 
      Callable<String> task1 = new Callable<String>() { 

        @Override 
        public String call() throws Exception { 
          Thread.sleep(5000); 
          return "task1"; 
        } 
      }; 
      // Create a faster thread which spans 1 secs 
      Callable<String> task2 = new Callable<String>() { 

        @Override 
        public String call() throws Exception { 
          Thread.sleep(1000); 
          return "task2"; 
        } 
      }; 
      List<Callable<String>> tasks = new ArrayList<>(); 
      tasks.add(task1); 
      tasks.add(task2); 

      String result = null; 
      try { 
        result = Observable.from(tasks) 
            .subscribeOn(Schedulers.computation()) 
            .flatMap(eachTask -> Observable.fromCallable(eachTask) 
                .subscribeOn(Schedulers.io()) 
                .doOnNext(e -> System.out.println("Executing your action on "+Thread.currentThread().getName())) 
                .doOnError(e -> System.out.println("Failed reason for : "+Thread.currentThread().getName()+" with error "+e.getMessage())) 
                ) 
            .toBlocking() 
            .first(); 

      } catch (Exception e) { 
        System.out.println(e.getMessage()); 
      } 
      System.out.println("result--->"+result); 
} 
+0

плохое предложение, это ожидание ВСЕХ из них, чтобы закончить, и THEN возьмите первый. –

+0

@ DanieleSegato nope, если вы могли бы выполнить код, как он есть, вы увидите, что он НЕ ДОЛЖЕН ВСЕХ из них завершить. Он выбирает ПЕРВЫЙ элемент, возвращаемый блокировкой. – Sabarish

+0

независимо, запрос требует слияния + принять (1) или merge + first() –

3

Если я прямо вас понял, что вам нужно что-то вроде этого:

taskSource 
    .flatMap(task -> // for each task 
     Observable.merge(
      // submit same task to multiple services 
      service1.submit(task), 
      service2.submit(task), 
      ..., 
      serviceN.submit(task) 
      ) 
      .take(1)) // take first response; discard others 
    ... // continue processing result of the task 
    .subscribe(...) 
2

Вы хотите оператор Observable.amb. Он имеет преимущество работы с Observables с более чем одной эмиссией.

+0

Спасибо, проверите его. – Sabarish

 Смежные вопросы

  • Нет связанных вопросов^_^