4

Проект образец находится здесь: https://github.com/codependent/spring5-playgroundSpring 5 Web Reactive - Hot Publishing - Как использовать EmitterProcessor соединить с MessageListener в поток событий

Я хотел бы навести на полученное сообщение из очереди JMS в Reactive Контроллер, который будет публиковать сообщения как поток событий.

Я не хочу, чтобы сообщения воспроизводились, то есть, если сообщение прибывает, и нет какого-либо подписчика, я не хочу, чтобы они отправлялись позже, когда какие-либо подбриты, поэтому я использую EmitterProcessor:

@Component 
public class AlertEmitterProcessor { 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    private EmitterProcessor<Alert> processor; 

    public AlertEmitterProcessor(){ 
     processor = EmitterProcessor.<Alert>create(); 
     processor.connect(); 
    } 

    public EmitterProcessor<Alert> getProcessor() { 
     return processor; 
    } 

    public void onNext(Alert alert){ 
     logger.info("onNext [{}]", alert); 
     processor.onNext(alert); 
    } 

    public void onComplete(){ 
     logger.info("onComplete"); 
     processor.onComplete(); 
    } 

    public void onError(Throwable t){ 
     logger.error("onError", t); 
     processor.onError(t); 
    } 
} 

Это мой MessageListener:

@Component 
public class AlertMessageListener implements MessageListener{ 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    @Autowired 
    private AlertEmitterProcessor alertProcessor; 

    @Autowired 
    private MappingJackson2HttpMessageConverter jacksonMessageConverter; 

    @Override 
    public void onMessage(Message message) { 
     logger.info("Message received: [{}]", message); 
     TextMessage tm = (TextMessage)message; 
     try { 
      Alert alert = jacksonMessageConverter.getObjectMapper().readValue(tm.getText(), Alert.class); 
      alertProcessor.onNext(alert); 
     } catch (IOException | JMSException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 
} 

И, наконец, мой Rest контроллер:

@Autowired 
private AlertEmitterProcessor alertTopicProcessor; 

@Autowired 
private AlertMessageListener messageListener; 

@Autowired 
private MappingJackson2HttpMessageConverter jacksonMessageConverter; 

@GetMapping(value="/accounts/{id}/alerts/live2", produces="text/event-stream") 
public Flux<Alert> getAccountAlertsStreaming2(@PathVariable Integer id) { 
    return alertTopicProcessor.getProcessor() 
     .log().filter(a -> a.getAccountId().equals(id)); 
} 

Чтобы проверить его поведение я добавил этот метод контроллера для имитации вставки в очереди:

@GetMapping(value="/mock/accounts/{id}/alerts/put", produces="text/event-stream") 
public void putAlert(@PathVariable Integer id) throws JsonProcessingException { 
    Alert alert = new Alert(id, (long)Math.round(Math.random()*10), "Message"); 
    String alertStr = jacksonMessageConverter.getObjectMapper().writeValueAsString(alert); 
    TextMessage tm = new MockTextMessage(alertStr); 
    messageListener.onMessage(tm); 
} 

Сразу же после запуска приложения я загрузить http://localhost:8080/accounts/1/alerts/live2 и браузер ожидает данных.

2016-10-03 13:43:38.755 DEBUG 12800 --- [nio-8080-exec-1] o.s.web.reactive.DispatcherHandler  : Processing GET request for [http://localhost:8080/accounts/1/alerts/live2] 
2016-10-03 13:43:38.770 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /accounts/1/alerts/live2 
2016-10-03 13:43:38.778 DEBUG 12800 --- [nio-8080-exec-1] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)] 
2016-10-03 13:43:38.779 DEBUG 12800 --- [nio-8080-exec-1] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsRestController' 
2016-10-03 13:43:38.800 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : onSubscribe([email protected]) 
2016-10-03 13:43:38.802 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : request(unbounded) 
2016-10-03 13:43:38.803 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : onNext(1) 
2016-10-03 13:43:38.822 INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2   : onSubscribe([email protected]f2) 
2016-10-03 13:43:38.822 INFO 12800 --- [nio-8080-exec-1] reactor.Flux.EmitterProcessor.2   : request(1) 
2016-10-03 13:43:38.823 INFO 12800 --- [nio-8080-exec-1] reactor.unresolved      : onComplete() 

Затем я публикую некоторые сообщения http://localhost:8080/mock/accounts/1/alerts/put.

2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] o.s.web.reactive.DispatcherHandler  : Processing GET request for [http://localhost:8080/mock/accounts/1/alerts/put] 
2016-10-03 13:43:43.063 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /mock/accounts/1/alerts/put 
2016-10-03 13:43:43.068 DEBUG 12800 --- [nio-8080-exec-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public void com.codependent.spring5.playground.reactive.web.AccountsRestController.putAlert(java.lang.Integer) throws com.fasterxml.jackson.core.JsonProcessingException] 
2016-10-03 13:43:43.069 DEBUG 12800 --- [nio-8080-exec-2] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsRestController' 
2016-10-03 13:43:43.071 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : onSubscribe([email protected]) 
2016-10-03 13:43:43.071 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : request(unbounded) 
2016-10-03 13:43:43.072 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : onNext(1) 
2016-10-03 13:43:43.112 INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertMessageListener : Message received: [com.[email protected]37262c9e] 
2016-10-03 13:43:43.145 INFO 12800 --- [nio-8080-exec-2] c.c.s.p.r.message.AlertEmitterProcessor : onNext [Alert [alertId=3, message=Message, accountId=1]] 
2016-10-03 13:43:43.146 INFO 12800 --- [nio-8080-exec-2] reactor.Flux.EmitterProcessor.2   : onNext(Alert [alertId=3, message=Message, accountId=1]) 
2016-10-03 13:43:43.177 INFO 12800 --- [nio-8080-exec-2] reactor.unresolved      : onComplete() 
2016-10-03 13:43:43.177 DEBUG 12800 --- [nio-8080-exec-2] o.s.h.s.r.ServletHttpHandlerAdapter  : Successfully completed request 

Но никто не попадает в обозреватель. Это событие заканчивается ошибкой 500 (без журнала).

После некоторых ручных попыток он начинает получать данные ...

2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] s.w.r.r.m.a.RequestMappingHandlerMapping : Returning handler method [public reactor.core.publisher.Flux<com.codependent.spring5.playground.reactive.dto.Alert> com.codependent.spring5.playground.reactive.web.AccountsRestController.getAccountAlertsStreaming2(java.lang.Integer)] 
2016-10-03 13:45:07.726 DEBUG 12800 --- [nio-8080-exec-8] o.s.b.f.s.DefaultListableBeanFactory  : Returning cached instance of singleton bean 'accountsRestController' 
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : onSubscribe([email protected]) 
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : request(unbounded) 
2016-10-03 13:45:07.727 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : onNext(1) 
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : onSubscribe([email protected]e) 
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : request(1) 
2016-10-03 13:45:07.729 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : onNext(Alert [alertId=4, message=Message, accountId=1]) 
2016-10-03 13:45:07.730 INFO 12800 --- [nio-8080-exec-8] reactor.unresolved      : onComplete() 
2016-10-03 13:45:07.747 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : request(1) 
2016-10-03 13:45:07.747 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : onNext(Alert [alertId=0, message=Message, accountId=1]) 
2016-10-03 13:45:07.748 INFO 12800 --- [nio-8080-exec-8] reactor.Flux.EmitterProcessor.9   : request(1) 

... но многие другие времена это не становится.

ответ

 Смежные вопросы

  • Нет связанных вопросов^_^