2014-12-17 3 views
3

В настоящее время я оцениваю Apache Crunch. Я последовал за простым WordCount MapReduce job example: Впоследствии я стараюсь сохранить результаты в автономном HBase. HBase работает (проверено с JPS и HBase оболочки), как описано здесь: http://hbase.apache.org/book/quickstart.htmlWordCount с Apache Crunch в HBase Standalone

Теперь я принимаю пример для записи в HBase:

Pipeline pipeline = new MRPipeline(WordCount.class,getConf()); 
PCollection<String> lines = pipeline.readTextFile(inputPath); 
PTable<String,Long> counts = noStopWords.count(); 
pipeline.write(counts, new HBaseTarget("wordCountOutTable"); 
PipelineResult result = pipeline.done(); 

я получаю исключение: «исключение: java.lang.IllegalArgumentException : HBaseTarget поддерживает только Put и Delete "

Какие-нибудь подсказки, что пошло не так?

ответ

3

PTable может быть PCollection, но HBaseTarget может обрабатывать объекты Put или Delete. Поэтому вам нужно преобразовать PTable в PCollection, где каждый элемент коллекции является либо Put, либо Delete. Посмотрите на Crunch-Examples, где это делается.

Пример преобразования может выглядеть следующим образом:

public PCollection<Put> createPut(final PTable<String, String> counts) { 
    return counts.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() { 
    @Override 
    public void process(final Pair<String, String> input, final Emitter<Put> emitter) { 
     Put put; 
     // input.first is used as row key 
     put = new Put(Bytes.toBytes(input.first())); 
     // the value (input.second) is added with its family and qualifier 
     put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second())); 
     emitter.emit(put); 
    } 
    }, Writables.writables(Put.class)); 
}