2017-01-30 6 views
2

У меня есть приложение весны, которое должно обрабатывать и хранить входящие данные сокета, из-за проблем с горлышком бутылки это должно быть сделано с помощью нескольких потоков.Весенняя параллельная обработка нескольких очередей с единственным пулом потоков

Поступающие данные принадлежит многих лиц и каждая сущность задач должна обрабатывать серийно, но я думаю, что назначение одной нити каждого объекта не является хорошим решением (тысячи отдельных потоков для обработки очереди, принадлежащих каждой)

Так как я могу определить public ThreadPool для обработки очередей всех сущностей с помощью алгоритма тарифа?

ответ

0

Вы можете использовать Project reactor или RxJava, чтобы разделить поток входящих сообщений по группам и обрабатывать события в каждой группе по отдельности.

С проектом реактора ваш код может выглядеть так:

Scheduler groupScheduler = Schedulers.newParallel("groupByPool", 16); 
    Flux.fromStream(incomingMessages()) // stream of new data from socket 
      .groupBy(Message::getEntityId) // split incoming messages by groups, which should be processed serially 
      .map(g -> g.publishOn(groupScheduler)) //create new publisher for groups of messages 
      .subscribe(//create consumer for main stream 
        stream -> 
          stream.subscribe(this::processMessage) // create consumer for group stream and process messagaes 
      ); 
+0

в вашем примере кода, вы создаете новый параллельный пул потоков для каждой группы, который я указываю как плохое решение в моем вопросе. Обработка всех задач с помощью алгоритма тарифа с единственным пулом потоков - это моя цель – Mojtabye

+0

@Mojtabye согласен, извините, пропустил это. Обновленный ответ –

2

Вы описали идеальную проблему для решения с помощью архитектуры, управляемой сообщениями.

Spring Integration - это модуль, который предоставляет это для вас.

Вы можете создавать свои сервисы задач и комментировать @ServiceActivator и создавать свою цепочку с каналами.

Каналы могут иметь опции для выполнения в другом пуле потоков, а узкие места из-за нагрузки на шип можно преодолеть с настройками очереди на вашем канале.

Обязательно попробуйте проверить документацию по интеграции пружин.

+0

пожалуйста улучшить свой ответ, добавив исходный код. – Mojtabye