2016-09-02 12 views
0

У меня есть базовый проект, который имеет четыре вызова некоторого внешнего ресурса, который в текущей версии работает синхронно. То, что я хотел бы достичь, - это обернуть это вызовом в HystrixObservableCommand, а затем вызвать его асинхронно.Netflix Hystrix - HystrixObservableCommand асинхронный запуск

Из того, что я прочитал, после вызова .observe() в объекте HystrixObservableCommand обернутую логику следует вызывать немедленно и асинхронно. Однако я делаю что-то неправильно, потому что он работает синхронно.

В примере кода вывод Void, потому что меня не интересует выход (на данный момент). Вот почему я не назначил Observable для любого объекта, просто вызванного constructor.observe().

@Component 
public class LoggerProducer { 

    private static final Logger LOGGER = Logger.getLogger(LoggerProducer.class); 

    @Autowired 
    SimpMessagingTemplate template; 

    private void push(Iterable<Message> messages, String topic) throws Exception { 
     template.convertAndSend("/messages/"+topic, messages); 
    } 

    public void splitAndPush(Iterable<Message> messages) { 

     Map<MessageTypeEnum, List<Message>> groupByMessageType = StreamSupport.stream(messages.spliterator(), true) 
       .collect(Collectors.groupingBy(Message::getType)); 

     //should be async - it's not 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.INFO), 
       MessageTypeEnum.INFO.toString().toLowerCase()).observe(); 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.WARN), 
       MessageTypeEnum.WARN.toString().toLowerCase()).observe(); 
     new CommandPushToBrowser(groupByMessageType.get(MessageTypeEnum.ERROR), 
       MessageTypeEnum.ERROR.toString().toLowerCase()).observe(); 

    } 

    class CommandPushToBrowser extends HystrixObservableCommand<Void> { 

     private Iterable<Message> messages; 
     private String messageTypeName; 

     public CommandPushToBrowser(Iterable<Message> messages, String messageTypeName) { 
      super(HystrixCommandGroupKey.Factory.asKey("Messages")); 
      this.messageTypeName = messageTypeName; 
      this.messages = messages; 
     } 

     @Override 
     protected Observable<Void> construct() { 
      return Observable.create(new Observable.OnSubscribe<Void>() { 

       @Override 
       public void call(Subscriber<? super Void> observer) { 
        try { 
         for (int i = 0 ; i < 50 ; i ++) { 
          LOGGER.info("Count: " + i + " messageType " + messageTypeName); 
         } 
         if (null != messages) { 
          push(messages, messageTypeName); 
          LOGGER.info("Message type: " + messageTypeName + " pushed: " + messages); 
         } 
         if (!observer.isUnsubscribed()) { 
          observer.onCompleted(); 
         } 
        } catch (Exception e) { 
         e.printStackTrace(); 
         observer.onError(e); 
        } 
       } 
      }); 
     } 
    } 
} 

Есть некоторые чистые «тест» фрагменты кода там, как я пытался выяснить проблему, просто игнорировать логику, основной упор делается, чтобы запустить его с асинхронной .observe(). Я знаю, что могу достичь этого со стандартом HystrixCommand, но это не цель.

Надежды кто-то помогает :) С уважением,

ответ

2

Ответ был найден:

«Наблюдаемый не добавлять параллелизм автоматически, если вы моделирование синхронное, блокирующее выполнения с наблюдаемом, то они будут. Выполнять синхронно.

Вы можете легко сделать это асинхронным путем планирования по потоку с помощью subscribeOn (Schedulers.io()). Вот простой пример для упаковки индивидуального вызова блокировки с наблюдаемом: https://speakerdeck.com/benjchristensen/applying-reactive-programming-with-rxjava-at-goto-chicago-2015?slide=33

Однако, если вы оберточной блокирующие вызовы, вы должны просто придерживаться с помощью HystrixCommand, как это то, что он построен для и он по умолчанию работает все в отдельном нить. Используя , HystrixCommand.observe() предоставит вам подходящую, асинхронную композицию, которую вы ищете.

HystrixObservableCommand предназначен для наматывания асинхронного, неблокирующего Наблюдаемых которые не нуждаются в дополнительной теме»

- Бен Christensen - Netflix Край Engineering

. Источник: https://groups.google.com/forum/#!topic/hystrixoss/g7ZLIudE8Rs

 Смежные вопросы

  • Нет связанных вопросов^_^