2009-07-19 4 views
3

У меня есть метод, который принимает массив запросов, и мне нужно запускать их против различных веб-API поисковой системы, таких как Google или Yahoo. Чтобы распараллелить процесс, для каждого запроса создается поток, который затем заканчивается join ed, так как мое приложение может продолжить только после У меня есть результаты каждый запрос. Я в настоящее время есть что-то вдоль этих линий:Операция многопоточного поиска

public abstract class class Query extends Thread { 
    private String query; 

    public abstract Result[] querySearchEngine(); 
    @Override 
    public void run() { 
     Result[] results = querySearchEngine(query); 
     Querier.addResults(results); 
    } 

} 

public class GoogleQuery extends Query { 
    public Result querySearchEngine(String query) { 
     // access google rest API 
    } 
} 

public class Querier { 
    /* Every class that implements Query fills this array */ 
    private static ArrayList<Result> aggregatedResults; 

    public static void addResults(Result[]) { // add to aggregatedResults } 

    public static Result[] queryAll(Query[] queries) { 
     /* for each thread, start it, to aggregate results */ 
     for (Query query : queries) { 
      query.start(); 
     } 
     for (Query query : queries) { 
      query.join(); 
     } 
     return aggregatedResults; 
    } 
} 

В последнее время я обнаружил, что есть новый API в Java для выполнения параллельных заданий. А именно, интерфейс Callable, FutureTask и ExecutorService. Мне было интересно, будет ли этот новый API использоваться, и если они более эффективны, чем традиционные, Runnable и Thread.

После изучения этого нового API, я придумал следующий код (упрощенный вариант):

public abstract class Query implements Callable<Result[]> { 
     private final String query; // gets set in the constructor 

     public abstract Result[] querySearchEngine(); 
     @Override 
     public Result[] call() { 
      return querySearchEngine(query); 
     } 
    } 

public class Querier { 
     private ArrayList<Result> aggregatedResults; 

     public Result[] queryAll(Query[] queries) { 
      List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length); 
      final ExecutorService service = Executors.newFixedThreadPool(queries.length); 
      for (Query query : queries) { 
       futures.add(service.submit(query)); 
      } 
      for (Future<Result[]> future : futures) { 
       aggregatedResults.add(future.get()); // get() is somewhat similar to join? 
      } 
      return aggregatedResults; 
     } 
    } 

Я новичок в этом параллелизм API, и я хотел бы знать, если есть что-то, что может улучшен в приведенном выше коде, и если он лучше, чем первый вариант (с использованием Thread). Есть несколько классов, которые я не изучал, например, FutureTask и т. Д. Мне бы хотелось услышать какие-либо советы по этому поводу.

+0

Выглядит хорошо, не уверен, что я что-то изменил в вашем втором примере. В вашем первом примере я бы расширил Runnable и не Thread, но это просто nitpicking. –

+0

+1, Это достаточно хорошо для меня. – akarnokd

ответ

7

Несколько проблем с вашим кодом.

  1. Возможно, вы должны использовать метод ExecutorService.invokeAll(). Стоимость создания новых потоков и нового пула потоков может быть значительной (хотя, возможно, не сравнивается с вызовами внешних поисковых систем). invokeAll() может управлять потоками для вас.
  2. Возможно, вы не хотите смешивать массивы и дженерики.
  3. Вы вызываете aggregatedResults.add() вместо addAll().
  4. Вам не нужно использовать переменные-члены, если они могут быть локальными для вызова функции queryAll().

Так, что-то вроде следующего должно работать:

public abstract class Query implements Callable<List<Result>> { 
    private final String query; // gets set in the constructor 

    public abstract List<Result> querySearchEngine(); 
    @Override 
    public List<Result> call() { 
     return querySearchEngine(query); 
    } 
} 

public class Querier { 
    private static final ExecutorService executor = Executors.newCachedThreadPool(); 

    public List<Result> queryAll(List<Query> queries) { 
     List<Future<List<Result>>> futures = executor.submitAll(queries); 
     List<Result> aggregatedResults = new ArrayList<Result>(); 
     for (Future<List<Result>> future : futures) { 
      aggregatedResults.addAll(future.get()); // get() is somewhat similar to join? 
     } 
     return aggregatedResults; 
    } 
} 
+0

. Изменение пула кешированных потоков может быть не лучшим вариантом, так как ваше приложение привязано к IO, так как большинство поисковых систем очень быстрые и будут оперативно реагировать , – akarnokd

+0

@ kd304: Действительно, поисковые системы, которые я использую, довольно быстр (Google и Yahoo, в настоящее время). Тем не менее, я использую много запросов, отсюда необходимость параллелизма. Каков ваш совет по этому поводу? Из того, что я читал в javadoc метода newCachedThreadPool, это, похоже, соответствует моим целям. Но опять же, я довольно новичок в этом API. –

+0

@Avi: Большое спасибо за предложения! –

4

Как futher улучшение, вы можете посмотреть в использовании CompletionService Это разъединяет порядок подачи и извлечения, вместо того, чтобы поставить все будущие результаты на очереди, из которой вы берете результаты в порядке, они будут завершены ..

+0

Поскольку приложение может продолжаться только в этом случае после завершения каждой задачи *, здесь может быть не подходит CompletionService. – Avi

+0

@Avi: Я не согласен, это просто не так хорошо, как будущее.получить(). – akarnokd

+0

@ kd304: Какой метод CompletionService вы использовали бы, чтобы получить все результаты набора задач? – Avi

3

Могу ли я предложить вам использовать Future.get() with a timeout?

В противном случае он будет принимать только один поисковик будучи не отвечает довести все до полной остановки (она даже не нужно быть проблемой поисковой системы, если, скажем, у вас есть проблемы сети на вашем конце)

+0

Спасибо. Каково типичное значение тайм-аута, которое используется для такого рода операций? –

+0

Я думаю, вам нужно спросить себя, как долго вы будете готовы ждать :-) Сделайте его настраиваемым и настройте его (скажем) на 10 раз обычное время отклика. –

+0

Я думаю, что правильный слой в коде для таймаута - это не Future.get(), это сетевой (HTTP?) Вызов самой поисковой системе. Если поисковая система выходит из строя, лучше ее следует поймать и не связывать нить, которая больше не нужна. – Avi