У меня проблема с крупномасштабной обработкой данных, которую я пытаюсь решить с помощью Apache Spark в Java. Мой вход представляет собой большой набор относительно небольших пользовательских объектов Java.Различные редукторы по ключу в Apache Spark
Мой шаг карты выполняет незначительные изменения для каждого объекта. Как только это будет сделано, он идентифицирует один или несколько классов эквивалентности, к которым принадлежит объект. В сочетании, может быть много миллиардов пар класса/объектов эквивалентности.
Моя проблема в том, что мне нужно выполнять разные операции над элементами каждого класса эквивалентности. Этот проект должен поддерживать архитектуру подключаемого модуля, поэтому я не знаю, что такое классы эквивалентности, или различные операции, которые должны выполняться для каждого класса.
Моя интуиция использовать что-то вроде следующего:
//Get the input set.
JavaRDD<MyType> input = ... //Not important
//Transform the input into (Equivalence Class, MyType) pairs,
//using strings to store the equivalence class.
JavaPairRDD<String, MyType> classedInput = input.flatMapToPair(
new PairFlatMapFunction<MyType, String, MyType>() {
Iterator<Tuple2<String, MyType>> call(MyType arg) {
List<Tuple2<String, MyType>> out = new ArrayList<>();
//Compute equivalence classes for arg.
for(String eqClz: getEquivalenceClasses(arg)) {
out.add(new Tuple2<String, MyType>(equClz, arg));
}
return out.iterator();
}
});
//Collapse the results for each equivalence class.
JavaPairRDD<String, MyType> output = classedInput.reduceByKey(
new Function2<MyType, MyType, MyType>() {
MyType call(MyType a, MyType b) {
String eqClz = ??? //<= Problem
List<MyModule> modules = MyFramework.getModulesForEqClz(eqClz);
for(MyModule m: modules) {
a = m.merge(a, b);
}
return a;
}
}
);
Я хотел бы быть в состоянии пройти класс эквивалентности в функцию для reduceByKey, для того, чтобы использовать его, чтобы определить, какие модули должны быть вызывается. Проблема в том, что ни одна из функциональных функций объединителя Spark, которую я могу найти, не может передать ключ в свои обратные вызовы.
Из-за размера classedInput я хотел бы избежать сохранения ключа с помощью объекта MyType или добавления слишком большого количества дополнительных распределенных операций после карты.
Есть ли более искрообразный способ выполнить то, что я пытаюсь?
Чтобы начать, отличные ответы. В моем конкретном случае использования я беспокоюсь о потреблении памяти и сети этих решений. Для 1., мое понимание Spark's .reduce() заключается в том, что он возвращает единственный результат вызывающему.Учитывая количество отдельных артефактов, я бы хотел не собирать их все в одном месте за один раз. Для 2. Я буду беспокоиться об объединении дополнительных данных с каждым значением. Моя интуиция заключается в том, что пара десятков байтов на запись будет равна отправке гигабайт или больше дополнительных данных по проводу. Является ли это разумным, или я с базы? – PilotScape64
@ PilotScape64 http://stackoverflow.com/help/someone-answers – huitseeker