2017-02-17 23 views
1

Я пытаюсь использовать пул потоков для выполнения запросов блокировки. Проблема заключается в том, что каждый запрос блокирует весь пул, а элементы обрабатываются последовательно. Не уверен, что это возможно. Кто-то пожалуйста, помогитеAkka пул актеров для блокировки запросов

city-dispatcher { 
    type = Dispatcher 
    executor = "thread-pool-executor" 
    thread-pool-executor { 
    fixed-pool-size = 16 
    } 
    throughput = 100 
} 

И Java

 Props props = Props.create(CityDataProcessorActor.class, psRespHolder).withDispatcher("akka.actor.city-dispatcher"); 

    SmallestMailboxPool pool = new SmallestMailboxPool(10); 

    ActorRef cityRequestActorPool = actorSystem.actorOf(pool.props(props), "city-request-route"); 
    for (String city : citiesArray) { 
     Future<Object> future = Patterns.ask(cityRequestActorPool, new CityCommand(city.trim()), timeout); 
     Object results = Await.result(future, duration); 
     log.info(results.toString()); 
    } 
+1

'результаты Object = Await.result (будущее, продолжительность);' этот кусок кода блокировки. Ваш цикл 'for' не будет продвигаться до тех пор, пока не будут заявлены результаты. –

ответ

0

комментарий пн кальмары является точно. Вот реализация. Он создаст список фьючерсов при их создании. Затем он последовательно блокирует собранные Фьючерсы для регистрации каждого из них. Ожидания должны стать тривиальными по мере продолжения итерации, поскольку последующие фьючерсы завершатся в одно и то же время.

.... 
Array<Future<Object>> futures = new ArrayList<>(); 
for (String city : citiesArray) { 
    Future<Object> future = Patterns.ask(cityRequestActorPool, new CityCommand(city.trim()), timeout); 
    futures.add(future); 
} 

for (<Future<Object>> f :futures){ 
    Object results = Await.result(f, duration); 
    log.info(results.toString()); 
} 
0

В @Mon каламари упоминалось Object results = Await.result(future, duration); является блокировка вызова. Вы можете попробовать будущее с обратным вызовом

future onComplete{ case Success()=> println(result) case Failure()=> println("some error") }