2015-04-29 2 views
0

Я использую реактор для публикации событий по всему приложению и реагирования различных потребителей на их события.Многопоточность для потребителей реакторов

Вот моя конфигурация реактора

@Configuration 
@EnableReactor 
public class ReactorConfiguration { 

    static { 
     Environment.initializeIfEmpty().assignErrorJournal(); 
    } 

    @Bean 
    public EventBus eventBus() { 
     return EventBus.config().env(Environment.get()).dispatcher(Environment.SHARED).get(); 
} 

Я ожидаю, что на основе кольца по умолчанию буфер диспетчеру используется и несколько сообщений, отправленных одному потребителю должны быть обработаны параллельно. Вместо этого он, похоже, обрабатывает события синхронно. Тема shared-1 используется для обработки моего события1 для потребителя1, а затем только после завершения обработки события1 тот же поток начинает обработку события2 на пользователе1.

Как добиться параллельной обработки, чтобы я мог отправлять несколько событий нескольким пользователям, а все события обрабатываются параллельно.

Буду признателен за любые предложения.

Это, как я направляю событие на автобус событий

dispatch(ReactorEvents.REPORT_REQUEST_EVENT, "", event); 

protected <T> void dispatch(String selector, String info, T event) { 
    eventBus.notify(selector, Event.wrap(Tuple.of(info, event))); 
} 

и вот один из потребителей

@Consumer 
public class ReportRequestHandler { 
... 
    @Selector(ReactorEvents.REPORT_REQUEST_EVENT) 
    @Override 
    public void handleRequest(Tuple2<String, ReportRequestEvent> tuple) { 
    ReportRequestEvent event = tuple.getT2(); 
    log.debug("processing report request " + event.getId()); 
    .... 
    } 
} 
+0

кажется, просто переходя к 'диспетчерскому (Environment.WORK_QUEUE)' или 'диспетчер (Environment.THREAD_POOL)' делает что я хочу, но я думаю, мне нужно понять разницу между ними немного больше, а во-вторых, если threadpool будет использовать тот же самый поток, что и остальная часть приложения – adeelmahmood

ответ

1

по умолчанию реализация Кольцевого буфера является однопоточной. Вот почему вы наблюдаете побочные эффекты, которые вы описали. Более подробно о различных диспетчерских вариантов, доступных для вас, пожалуйста, ознакомьтесь со следующей ссылкой:

http://projectreactor.io/docs/reference/#_dispatchers