2016-08-25 10 views
2

У меня проблема с крупномасштабной обработкой данных, которую я пытаюсь решить с помощью Apache Spark в Java. Мой вход представляет собой большой набор относительно небольших пользовательских объектов Java.Различные редукторы по ключу в Apache Spark

Мой шаг карты выполняет незначительные изменения для каждого объекта. Как только это будет сделано, он идентифицирует один или несколько классов эквивалентности, к которым принадлежит объект. В сочетании, может быть много миллиардов пар класса/объектов эквивалентности.

Моя проблема в том, что мне нужно выполнять разные операции над элементами каждого класса эквивалентности. Этот проект должен поддерживать архитектуру подключаемого модуля, поэтому я не знаю, что такое классы эквивалентности, или различные операции, которые должны выполняться для каждого класса.

Моя интуиция использовать что-то вроде следующего:

//Get the input set. 
JavaRDD<MyType> input = ... //Not important 

//Transform the input into (Equivalence Class, MyType) pairs, 
//using strings to store the equivalence class. 
JavaPairRDD<String, MyType> classedInput = input.flatMapToPair(

    new PairFlatMapFunction<MyType, String, MyType>() { 

     Iterator<Tuple2<String, MyType>> call(MyType arg) { 

      List<Tuple2<String, MyType>> out = new ArrayList<>(); 

      //Compute equivalence classes for arg. 
      for(String eqClz: getEquivalenceClasses(arg)) { 
       out.add(new Tuple2<String, MyType>(equClz, arg)); 
      } 

      return out.iterator(); 
     } 
}); 

//Collapse the results for each equivalence class. 
JavaPairRDD<String, MyType> output = classedInput.reduceByKey(

    new Function2<MyType, MyType, MyType>() { 

     MyType call(MyType a, MyType b) { 
      String eqClz = ??? //<= Problem 
      List<MyModule> modules = MyFramework.getModulesForEqClz(eqClz); 
      for(MyModule m: modules) { 
       a = m.merge(a, b); 
      } 
      return a; 
     } 
    } 

); 

Я хотел бы быть в состоянии пройти класс эквивалентности в функцию для reduceByKey, для того, чтобы использовать его, чтобы определить, какие модули должны быть вызывается. Проблема в том, что ни одна из функциональных функций объединителя Spark, которую я могу найти, не может передать ключ в свои обратные вызовы.

Из-за размера classedInput я хотел бы избежать сохранения ключа с помощью объекта MyType или добавления слишком большого количества дополнительных распределенных операций после карты.

Есть ли более искрообразный способ выполнить то, что я пытаюсь?

ответ

0

Похоже, ваша проблема обратная задача . И это можно решить с помощью обратного решения, я думаю (ниже в 2.).

  1. Один из способов заключается в использовании reduce функции (или ее более полную версию, aggregate), которая просто требует ассоциативную операцию для вас суммировать результаты ваших данных, независимо от ключа. Но выражение деталей вашей группировки элементов в одном классе эквивалентности может быть немного сложным.
  2. Чем проще способ сохранить ссылку на класс эквивалентности, против которой элементы были подобраны, чтобы просто повторить класс эквивалентности в значении:

Tuple2<String, MyType> outValue = new Tuple2<String, MyType>(eqClz, arg); out.add(new Tuple2<String, Tuple2<String, MyType>>(equClz, outValue));

Если, как вы упомянули в комментарии , вы беспокоитесь о размере перетасовки при передаче своих данных, возможно, то, что вы хотите ограничить, - это размер представления, используемого в качестве конструкции манипуляции. Я имею в виду, что повторение ключа в значении, как было предложено выше, приводит к двум копиям переменной equClz. Но если это десяток байтов, тот, размер которого вы хотите уменьшить, является копией в ключевой позиции. Для этого вы можете выбрать некритичный хэш правильной длины.

Вы упомянули, что еще дюжина байтов на запись привела бы к гигабайтам больше данных, что должно означать, что у вас всего несколько сотен миллионов записей и, следовательно, самое большее несколько сотен миллионов значений «equClz». Это легко покрывается 32-битным некриптографическим хешем (вы легко найдете реализации этих, Murmur3, XXHash). Так как 32 бита составляют 4 байта, это должно сократить накладные расходы на передачу, по крайней мере, на порядок.

+0

Чтобы начать, отличные ответы. В моем конкретном случае использования я беспокоюсь о потреблении памяти и сети этих решений. Для 1., мое понимание Spark's .reduce() заключается в том, что он возвращает единственный результат вызывающему.Учитывая количество отдельных артефактов, я бы хотел не собирать их все в одном месте за один раз. Для 2. Я буду беспокоиться об объединении дополнительных данных с каждым значением. Моя интуиция заключается в том, что пара десятков байтов на запись будет равна отправке гигабайт или больше дополнительных данных по проводу. Является ли это разумным, или я с базы? – PilotScape64

+0

@ PilotScape64 http://stackoverflow.com/help/someone-answers – huitseeker