2016-12-09 9 views
7

У меня возникли проблемы с пониманием того, как Spark взаимодействует с хранилищем.Интеграция базы данных ключ-значение с Spark

Я хотел бы создать кластер Spark, который извлекает данные из базы данных RocksDB (или любого другого хранилища значений ключа). Тем не менее, в этот момент лучшее, что я могу сделать, это собрать весь набор данных из базы данных в память в каждом узле кластера (например, на карте) и построить RDD из этого объекта.

Что мне нужно сделать, чтобы извлечь только необходимые данные (например, Spark с HDFS)? Я читал о Hadoop Input Format и Record Readers, но я не совсем понимаю, что я должен реализовать.

Я знаю, что это довольно широкий вопрос, но я бы очень признателен за помощь, чтобы начать меня. Заранее спасибо.

ответ

4

Это одно из возможных решений. Я предполагаю, что у вас есть клиентская библиотека для хранилища ключей (RocksDB в вашем случае), к которой вы хотите получить доступ.
KeyValuePair представляет собой класс bean-компонента, представляющий одну пару Key-value из вашего хранилища ключей.

Классы

/*Lazy iterator to read from KeyValue store*/ 
class KeyValueIterator implements Iterator<KeyValuePair> { 
    public KeyValueIterator() { 
     //TODO initialize your custom reader using java client library 
    } 
    @Override 
    public boolean hasNext() { 
     //TODO 
    } 

    @Override 
    public KeyValuePair next() { 
     //TODO 
    } 
} 
class KeyValueReader implements FlatMapFunction<KeyValuePair, KeyValuePair>() { 
    @Override 
    public Iterator<KeyValuePair> call(KeyValuePair keyValuePair) throws Exception { 
     //ignore empty 'keyValuePair' object 
     return new KeyValueIterator(); 
    } 
} 

Создать KeyValue RDD

/*list with a dummy KeyValuePair instance*/ 
ArrayList<KeyValuePair> keyValuePairs = new ArrayList<>(); 
keyValuePairs.add(new KeyValuePair()); 
JavaRDD<KeyValuePair> keyValuePairRDD = javaSparkContext.parallelize(keyValuePairs); 
/*Read one key-value pair at a time lazily*/  
keyValuePairRDD = keyValuePairRDD.flatMap(new KeyValueReader()); 

Примечание:

Над раствором создает RDD с двумя разделами по умолчанию (один из них будет пусто). Увеличьте разделы перед применением любого преобразования на keyValuePairRDD, чтобы распределить обработку между исполнителями. Различные способы увеличения разделов:

keyValuePairRDD.repartition(partitionCounts) 
//OR 
keyValuePairRDD.partitionBy(...) 
+0

ли это позволяют для каждого узла искрового только получать данные, которые он собирается обрабатывать? – PablodeAcero

+0

Да. Я отредактировал ответ с запиской (спасибо за то, что вы заметили это сомнение). Увеличивая количество разделов, данные будут распределены между исполнителями. И каждый исполнитель получает только данные, необходимые для обработки. – code

+0

Спасибо за ответ. Хотя это не тот ответ, который я искал, я обязательно попробую это решение. – PablodeAcero