0

Использование проекта Reactor 3.0.4.RELEASE. Концептуально, должен быть таким же и в RxJava.Реактивное распараллеливание не работает

public Mono<Map<String, Boolean>> refreshPods(List<String> apps) { 
    return pods(apps) 
      .filter(this::isRunningAndNotThisApp) 
      .groupBy(Item::getName) 
      .flatMap(g -> g 
        .distinct(Item::getIp) 
        .collectList() 
        // TODO: This doesn't seem to be working as expected 
        .subscribeOn(Schedulers.newParallel("par-grp")) 
        .flatMap(client::refreshPods)) 
      .flatMap(m -> Flux.fromIterable(m.entrySet())) 
      .collectMap(Map.Entry::getKey, Map.Entry::getValue); 
} 

Идея состоит в том, чтобы запускать client.refreshPods в отдельном потоке для каждой группы.

Редактировать: Я пробовал publishOn, прежде чем размещать этот вопрос и после ответов, приведенных здесь, но выход не изменяется.

Клиент:

public class MyServiceClientImpl implements MyServiceClient { 
    private final RestOperations restOperations; 
    private final ConfigRefreshProperties configRefreshProperties; 

    public Mono<Map<String, Boolean>> refreshPods(List<Item> pods) { 
     return Flux.fromIterable(pods) 
       .zipWith(Flux.interval(Duration.ofSeconds(configRefreshProperties.getRefreshDelaySeconds())), 
         (x, delay) -> x) 
       .flatMap(this::refreshWithRetry) 
       .collectMap(Tuple2::getT1, Tuple2::getT2); 
    } 

    private Mono<Tuple2<String, Boolean>> refreshWithRetry(Item pod) { 
     return Mono.<Boolean>create(emitter -> { 
      try { 
       log.info("Attempting to refresh pod: {}.", pod); 
       ResponseEntity<String> tryRefresh = refresh(pod); 

       if (!tryRefresh.getStatusCode().is2xxSuccessful()) { 
        log.error("Failed to refresh pod: {}.", pod); 
        emitter.success(); 
       } else { 
        log.info("Successfully refreshed pod: {}.", pod); 
        emitter.success(true); 
       } 
      } catch (Exception e) { 
       emitter.error(e); 
      } 
     }) 
       .map(b -> Tuples.of(pod.getIp(), b)) 
       .log(getClass().getName(), Level.FINE) 
       .retryWhen(errors -> { 
        int maxRetries = configRefreshProperties.getMaxRetries(); 
        return errors.zipWith(Flux.range(1, maxRetries + 1), (ex, i) -> Tuples.of(ex, i)) 
          .flatMap(t -> { 
           Integer retryCount = t.getT2(); 
           if (retryCount <= maxRetries && shouldRetry(t.getT1())) { 
            int retryDelaySeconds = configRefreshProperties.getRetryDelaySeconds(); 
            long delay = (long) Math.pow(retryDelaySeconds, retryCount); 
            return Mono.delay(Duration.ofSeconds(delay)); 
           } 
           log.error("Done retrying to refresh pod: {}.", pod); 
           return Mono.<Long>empty(); 
          }); 
       }); 
    } 

    private ResponseEntity<String> refresh(Item pod) { 
     return restOperations.postForEntity(buildRefreshEndpoint(pod), null, String.class); 
    } 

    private String buildRefreshEndpoint(Item pod) { 
     return UriComponentsBuilder.fromUriString("http://{podIp}:{containerPort}/refresh") 
       .buildAndExpand(pod.getIp(), pod.getPort()) 
       .toUriString(); 
    } 

    private boolean shouldRetry(Throwable t) { 
     boolean clientError = ThrowableAnalyzer.getFirstOfType(t, HttpClientErrorException.class) 
       .map(HttpClientErrorException::getStatusCode) 
       .filter(s -> s.is4xxClientError()) 
       .isPresent(); 

     boolean timeoutError = ThrowableAnalyzer.getFirstOfType(t, TimeoutException.class) 
       .isPresent(); 

     return timeoutError || !clientError; 
    } 
} 

Проблема заключается в том, что утверждение журнала Attempting to refresh pod печатается на одной и той же нити для каждой группы. Что мне здесь не хватает?

Бревно из теста:

2017-02-07 10:g12:55.348 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.357 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=news, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.358 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.363 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=parking, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.364 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.368 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=localsearch, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.369 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.372 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=auth, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.373 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.377 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=log, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.378 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Attempting to refresh pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running). 
2017-02-07 10:12:55.381 INFO 33905 --- [  timer-1] c.n.d.cloud.config.MyServiceClientImpl : Successfully refreshed pod: Item(name=fuel, ip=127.0.0.1, port=8888, podPhase=Running). 
+0

у вас есть журнал, вы можете поделиться своим выходом? также вы можете подтвердить, что может быть несколько «Предмет» с тем же «именем» (т.е. группировка действительно производит группы с несколькими элементами)? Вы пробовали 'publishOn', а не' subscribeOn', поскольку вас интересует только выполнение нисходящего потока в отдельном потоке? –

+0

Вам нужно «публиковать» после строки TODO, а не 'subscribeOn'. – akarnokd

+0

@akarnokd Это тоже мое понимание, поскольку я хочу испускать отдельные потоки, не работает. См. Править. –

ответ

0

Я отправляю ответ сам для полноты картины. С помощью @ simon-baslé и @akarnokd я понял все правильно. Обе следующие работы. См. reactor-core#421.

Решение 1:

zipWith(Flux.interval(Duration.ofSeconds(groupMemberDelaySeconds)), 
    (x, delay) -> x) 
.publishOn(Schedulers.newParallel("par-grp")) 
.flatMap(this:: refreshWithRetry) 

Решение 2:

zipWith(Flux.intervalMillis(1000 * groupMemberDelaySeconds, Schedulers.newTimer("par-grp")), 
    (x, delay) -> x) 
.flatMap(this:: refreshWithRetry) 

Нет subscribeOn или publishOn необходим в методе refreshPods.

1

редактировать: Как сделать более явные благодаря вашему вновь предоставленному бревну, и, как подобран Давиду в этом вопросе вы создали, основная причина является то, что здесь вы используете interval. Это переключит контекст на значение по умолчанию TimedScheduler (что будет одинаково для всех групп). Вот почему все, что делалось до вызова refreshPods, кажется, игнорируется (работа выполняется на потоке интервалов), но publishOn/subscribeOn после оператор интервала должен работать. Вкратце мой совет по использованию subscribeOn непосредственно после create по-прежнему стоит.

Вы вызываете блокирующее поведение (refresh(pod)), которое вы обмениваете как Mono в refreshWithRetry.

Если у вас нет сильной потребности в том, чтобы быть несовместимым на этом уровне, я бы посоветовал немедленно связать subscribeOn рядом с create.

Таким образом, нет ничего удивительного в использовании этого Mono: он соблюдает договор и не блокирует его. Как это:

return Mono.<Boolean>create(emitter -> { 
     //... 
    }) 
.subscribeOn(Schedulers.newParallel("par-grp")) 
.map(b -> Tuples.of(pod.getIp(), b)) 

Если вы хотите метод вернуть параллелизм-агностик издателя, то вы должны были бы поставить subscribeOn ближе к блокирующему издателю, так что вам нужно расширить flatMap лямбду:

.flatMap(pods -> client.refreshPods(pods) 
         .subscribeOn(Schedulers.newParallel("par-grp")) 
) 
+0

Ваше второе решение не работает, см. m не так, ваше первое решение будет запускать каждый элемент в группе по отдельному потоку, правильно? Это отличается от запуска каждой группы в отдельном потоке, но каждый элемент в том же потоке. –

+0

Filed [# 421] (https: // github.com/reactor/reactor-core/issues/421) –

0

В вашем коде вы положили publishOn до flatMap. Методы, объединяющие наблюдаемые значения, такие как flatMap или zip, выполняют свою собственную перепланировку при работе с асинхронными источниками. interval - такой асинхронный источник в вашем случае. Вот почему вы получаете все результаты по теме «таймер».

1) Используйте publishOnперед операцией, которую вы хотите параллельно проводить. Сама операция не должна включать перепланировку. Например. map - хороший, flatMap - это плохо.

2) Используйте другой publishOn сразу после него, чтобы перенести результаты.В противном случае поток подписчика может помешать.

Flux.range(1, 100) 
     .groupBy(i -> i % 5) 
     .flatMap(group -> group 
       .publishOn(Schedulers.newParallel("grp", 8)) 
       .map(v -> { 
        // processing here 
        String threadName = Thread.currentThread().getName(); 
        logger.info("processing {} from {} on {}", v, group.key(), threadName); 
        return v; 
       }) 
       .publishOn(Schedulers.single()) 
     ) 
     .subscribe(v -> logger.info("got {}", v)); 

В случае, если вы хотите, чтобы убедиться, что элементы всех групп работают на тот же поток увидеть этот ответ: https://stackoverflow.com/a/41697348/697313

+0

Я не уверен, что второй «publishOn» имеет смысл, и он все равно не работает. –

+0

@AbhijitSarkar Попробуйте мой пример - я протестировал его на 3.0 .4, и он работает, а затем постепенно создайте свой код поверх него. Попробуйте с и без второго 'publishOn' и посмотрите. –

+0

На самом деле, я предлагаю вам попробовать пример в билете [# 421] (https: // github. com/реактор/ядро-реактор/проблемы/421). Проблема с вашим примером - это то, что я называю «привет мир», синдром; он касается только части проблемы, и иногда часть не подходит для большей головоломки. Просто для ударов я запустил ваш код для примера в билете, и, как я уже говорил, он не работал для моего варианта использования. Попробуйте сами и дайте мне знать. –

 Смежные вопросы

  • Нет связанных вопросов^_^