2015-02-19 4 views
3

Я пытаюсь выполнить некоторые численные вычисления на большом распределенном наборе данных. Алгоритмы хорошо согласуются с моделью MapReduce с дополнительным свойством, которое выводится с шага карты малым по сравнению с входными данными. Данные могут считаться доступными только для чтения и статически распределяются по узлам (за исключением повторной балансировки при отказе). Обратите внимание, что это несколько противоречит стандартным примерам примеров слов, где входные данные отправляются узлам, выполняющим шаг карты.GridGain: MapReduce с локальной обработкой данных на узле?

Это означает, что шаг карты должен выполняться параллельно на всех узлах, обрабатывая локальные данные каждого узла, в то время как допустимо, что вывод с шага карты отправляется на один узел для этапа уменьшения.

Каков наилучший способ реализовать это с помощью GridGain?

Похоже, что существует метод reduce (..) на интерфейсах GridCache/GridCacheProjection в более ранних версиях GridGain, но этого больше нет. Есть ли замена? Я имею в виду механизм, который берет закрытие карты и выполняет его распределенным на каждой опорной точки ровно один раз, избегая при этом копировать любые входные данные по сети.

The (несколько ручной) подход, который я придумал до сих пор является следующее:

public class GridBroadcastCountDemo { 

    public static void main(String[] args) throws GridException { 
     try (Grid grid = GridGain.start(CONFIG_FILE)) { 

      GridFuture<Collection<Integer>> future = grid.forRemotes().compute().broadcast(new GridCallable<Integer>() { 
       @Override 
       public Integer call() throws Exception { 
        GridCache<Integer, float[]> cache = grid.cache(CACHE_NAME); 
        int count = 0; 
        for (float[] array : cache.primaryValues()) { 
         count += array.length; 
        } 
        return count; 
       } 
      }); 

      int totalCount = 0; 
      for (int count : future.get()) { 
       totalCount += count; 
      } 
      // expect size of input data 
      System.out.println(totalCount); 
     } 
    } 
} 

Там нет, однако, никакой гарантии, что каждый элемент данных обрабатывается ровно один раз с таким подходом. Например. когда выполняется повторная балансировка во время выполнения GridCallable s, часть данных может обрабатываться как ноль, так и несколько раз.

ответ

1

С открытым исходным кодом GridGain (который сейчас Apache Ignite) имеет API ComputeTask, который имеет как методы map(), так и reduce(). Если вы ищете метод reduce(), то ComputeTask определенно является правильным API для вас.

На данный момент ваша реализация в порядке. Apache Ignite добавляет функцию, где узел не будет считаться основным, пока миграция не будет полностью завершена. Он должен скоро появиться.

+0

Приятно знать, что будет функция, которая гарантирует, что ключи не считаются первичными на нескольких узлах во время перебалансировки. Однако ошибочная обработка нескольких ключей несколько раз не является проблемой (кроме некоторых накладных расходов), поскольку дубликаты легко обнаруживаются и отбрасываются во время шага уменьшения. Противоположный случай был бы действительно проблематичным, если бы какие-либо ключи не обрабатывались из-за перебалансировки. Возможно ли, что это происходит, или GridGain предоставляет любую гарантию того, что в любой момент есть как минимум одна копия каждого первичного ключа? –

+0

@ QwertZuiopü Противоположный случай невозможно. GridGain гарантирует, что есть хотя бы одна копия ключа. – Dmitriy

+0

@ QwertZuiopü Кстати, вы можете отслеживать функцию отложенных назначений первичных ключей до завершения предварительной загрузки: https://issues.apache.org/jira/browse/IGNITE-324 – Dmitriy

 Смежные вопросы

  • Нет связанных вопросов^_^