Я использую реактор для публикации событий по всему приложению и реагирования различных потребителей на их события.Многопоточность для потребителей реакторов
Вот моя конфигурация реактора
@Configuration
@EnableReactor
public class ReactorConfiguration {
static {
Environment.initializeIfEmpty().assignErrorJournal();
}
@Bean
public EventBus eventBus() {
return EventBus.config().env(Environment.get()).dispatcher(Environment.SHARED).get();
}
Я ожидаю, что на основе кольца по умолчанию буфер диспетчеру используется и несколько сообщений, отправленных одному потребителю должны быть обработаны параллельно. Вместо этого он, похоже, обрабатывает события синхронно. Тема shared-1
используется для обработки моего события1 для потребителя1, а затем только после завершения обработки события1 тот же поток начинает обработку события2 на пользователе1.
Как добиться параллельной обработки, чтобы я мог отправлять несколько событий нескольким пользователям, а все события обрабатываются параллельно.
Буду признателен за любые предложения.
Это, как я направляю событие на автобус событий
dispatch(ReactorEvents.REPORT_REQUEST_EVENT, "", event);
protected <T> void dispatch(String selector, String info, T event) {
eventBus.notify(selector, Event.wrap(Tuple.of(info, event)));
}
и вот один из потребителей
@Consumer
public class ReportRequestHandler {
...
@Selector(ReactorEvents.REPORT_REQUEST_EVENT)
@Override
public void handleRequest(Tuple2<String, ReportRequestEvent> tuple) {
ReportRequestEvent event = tuple.getT2();
log.debug("processing report request " + event.getId());
....
}
}
кажется, просто переходя к 'диспетчерскому (Environment.WORK_QUEUE)' или 'диспетчер (Environment.THREAD_POOL)' делает что я хочу, но я думаю, мне нужно понять разницу между ними немного больше, а во-вторых, если threadpool будет использовать тот же самый поток, что и остальная часть приложения – adeelmahmood