2017-02-04 15 views
1

Я пишу веб-приложение на Java (а именно используя JavaLite). В этом веб-приложении у меня есть конечная точка, которая должна отправлять кучу других запросов на сервер при вызове. Так как эти запросы могут увеличить число, я решил послать эти запросы параллельно, используя Java API параллелизма, введенный в Java 8. Мой код для отправки несколько запросов параллельно выглядит следующим образом:Отправка запросов параллельно с ExecutorService в Java 8

public List<String> searchAll(List<String> keywords) { 
    ExecutorService executor = Executors.newWorkStealingPool(); 
    List<Callable<List<String>>> tasks = new ArrayList<>(); 
    for (String key : keywords) { 
     tasks.add(() -> { 
      LOGGER.info("Sending query for key: " + key); 
      return sendSearchQuery(key); 
     }); 
    } 
    List<String> all = new ArrayList<>(); 
    try { 
     executor.invokeAll(tasks) 
       .stream() 
       .map(future -> { 
        try { 
         return future.get(); 
        } 
        catch (Exception e) { 
         throw new IllegalStateException(e); 
        } 
       }) 
       .forEach((list) -> 
       { 
        LOGGER.info("Received list: " + list); 
        all.addAll(list); 
       }); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    return all; 
} 

private List<String> sendSearchQuery(String query) throws UnirestException { 
    long startTime = System.nanoTime(); 
    HttpResponse<JsonNode> response = Unirest.get(SEARCH_URL) 
      .queryString("q", query).asString(); 
    Map<String, Object> result = JsonHelper.toMap(response.getBody()); 
// Get get = Http.get(SEARCH_URL + "?q=" + query); 
// Map<String, Object> result = JsonHelper.toMap(get.text()); 
    LOGGER.info("Query received in " + (System.nanoTime() - startTime)/1000000 + " ms for key: " + query); 
    return (List<String>) result.get("result"); 
} 

И выход для этот кусок кода выглядит следующим образом:

[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Sending query for key: sky 
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Sending query for key: outdoor 
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Sending query for key: bridge 
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: water 
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 1331 ms for key: water 
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Sending query for key: building 
[ForkJoinPool-2-worker-1] INFO app.managers.SearchManager - Query received in 1332 ms for key: sky 
[ForkJoinPool-2-worker-2] INFO app.managers.SearchManager - Query received in 1332 ms for key: outdoor 
[ForkJoinPool-2-worker-3] INFO app.managers.SearchManager - Query received in 1332 ms for key: bridge 
[ForkJoinPool-2-worker-0] INFO app.managers.SearchManager - Query received in 302 ms for key: building 
[[email protected]] INFO app.managers.SearchManager - Received list: [16973, 4564, 12392, 1195, 1207, 682, 10518, 10532, 10545, 19328, 10524, 10537, 10551, 19334, 10522, 10535, 10548, 19332, 10521, 10534] 
[[email protected]] INFO app.managers.SearchManager - Received list: [] 
[[email protected]] INFO app.managers.SearchManager - Received list: [4303, 2844, 4366] 
[[email protected]] INFO app.managers.SearchManager - Received list: [9490, 1638, 20006, 17715, 17758, 18788, 6071, 11230, 13384, 4940, 18039, 17871, 16629, 6148, 19172, 4263, 4569, 8396, 18643, 4904] 
[[email protected]] INFO app.managers.SearchManager - Received list: [17306, 17303, 17305, 17304, 16062, 16156, 16153, 16154, 16061, 9098, 2491, 4368, 22134, 1008, 16152, 16151, 16148, 16155, 16147, 16149] 

Как вы можете видеть, я использовал две разные библиотеки HTTP (JavaLite Http и Unirest), чтобы увидеть, если проблема была с библиотекой, который я использовал, однако это не кажется так как они оба дают ту же проблему.

Проблема заключается в том, что первые n (количество процессоров на машине) запрашивает начало и конец одновременно. Это нормально, однако они также занимают больше времени, чем должны. Скажем, один запрос принимает t время в нормальных условиях. В этом случае первые запросы n берут около n * t времени, а остальные запросы принимают t раз. Я неправильно использую API параллелизма?

Редактировать: Сервер, работающий на SEARCH_URL, развертывается на Azure и может обрабатывать несколько запросов.

Я также попытался следующие:

  • с помощью ExecutorService.newFixedThreadPool(), однако Executor я использую, кажется, не быть причиной этой проблемы.
  • , вызывающий invokeAny() вместо invokeAll(), однако прежний блокирует основной поток, пока одна из задач не завершится, и возвращает результаты этой задачи.

Edit 2: Так что я играл с сервером и приложением я в настоящее время работаю. Самое странное, что сервер реагирует на n запросов в разное время, однако приложение получает эти ответы после временного кадра, который начинается с момента, когда первый запрос достигает сервера и заканчивается с временем n. У меня нет объяснения этого поведения.

+0

Что произойдет, если вы просто поставить фиксированный сон вместо вызова HTTP? Таким образом, мы можем быть уверены, что это проблема с сервисом исполнителя или сервером. Кроме того, добавьте дату-время в регистратор и распечатайте результат. Будет легче визуализировать проблему, которую вы объяснили. –

+0

@MadPiranha работает, как и ожидалось, когда я не отправляю запросы. Проблема действительно связана с самим сервером. Я запустил сервер на localhost, и первые ответы 'n' действительно отправлены в течение более длительного времени. – halileohalilei

+0

В зависимости от того, что означает «первые n запросов», ваша проблема может быть связана с загрузкой и инициализацией соответствующих классов в пакете «java.util.concurrent». Я помню, как я читал об этом где-то, но, к сожалению, я не могу вспомнить, где именно. –

ответ

0
invokeAll(Collection<? extends Callable<T>> tasks) 
  • Выполнение поставленных задач, возвращая список фьючерсов, имеющих свой статус и результаты, когда все полные.

Имеется ли у вашего источника возможность обслуживать несколько запросов?

+0

Вы правы, документация действительно говорит «когда все завершено», однако остальные запросы отправляются нормально, а объекты «Будущие» возвращаются вовремя без каких-либо проблем. Кроме того, нет никаких альтернатив 'invokeAll()', который выполняет задачи, независимые друг от друга. Сервер развернут на Azure и может обрабатывать несколько запросов. – halileohalilei

+0

'' invokeAny() '' есть. И если вы используете '' Executors.newFixedThreadPool() '', вы можете отправлять несколько задач один за другим, и они будут выполняться параллельно. –

+0

Кроме того, это время ничего не говорит нам, кроме того, что он ждал, когда все четыре запроса будут завершены. –

0

Вы посмотрели на завершающую фреймворк, который был введен для java 8? Я мог бы помочь с тобой, пытаясь отправить все асинхронные.

List<CompletableFuture<List<String>>> futures = keywords.parallelStream() 
      .map(key -> CompletableFuture.supplyAsync(() -> sendSearchQuery(key), executor)) 
      .collect(toList()); 

CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); 

try { 
    allOf.join(); 
} catch (Exception e){ 
    e.printStackTrace(); 
} 

List<String> all = futures.stream().filter(CompletableFuture::isCompletedExceptionally) 
      .flatMap(future -> future.join().stream()) 
      .collect(toList()); 

return all; 

Что это будет делать будет посылать все, что выбирает только асинхр, то вы звоните allOf.join(), вы будете ждать на все, чтобы вернуться.

Конечный поток затем отображает каждый результат обратно в один список и возвращает

 Смежные вопросы

  • Нет связанных вопросов^_^