2016-08-22 1 views
3

Я хочу получить строки из HBase, используя фильтр типа QualiferFilter в python-api.
Я знаю, как получить строки из HBase, например, под кодом.Spark: Как использовать фильтр HBase, например, QualiferFilter by python-api

host = 'localhost' 
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter" 
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter" 
conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": "user", 
       "hbase.mapreduce.scan.columns": "u:uid", 
       "hbase.mapreduce.scan.row.start": "1", "hbase.mapreduce.scan.row.stop": "100"} 
rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", 
          "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
         "org.apache.hadoop.hbase.client.Result", 
         keyConverter=keyConv, valueConverter=valueConv, conf=conf) 

Но, я также хочу получить строки с использованием фильтров.
Какие коды необходимо добавить?

+0

вы выяснить, как это сделать? – void

+0

Я не мог найти решение. В конце концов, я использую Scala API. Я думаю, что Python API еще нельзя использовать для производственной среды. – penlight

ответ

0

Здравствуйте, вы можете проверить этот код ................

def doYourStuff(row): 
    text = row.split("\n") 
    data = {} 
    for row in text: 
     if json.loads(row)["qualifier"] == "message": 
       data["message"] = json.loads(row)["value"] 
     if json.loads(row)["qualifier"] == "domain": 
       data["domain"] = json.loads(row)["value"] 
     data["rowKey"] = json.loads(row)["row"] 
     return DoWhatYouWantToDo(data) 

    def save_record(rdd): 
     host = '[email protected]@[email protected]@' 
     table = 'TableName' 
     keyConv1 = "org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter" 
     valueConv1 = "org.apache.spark.examples.pythonconverters.StringListToPutConverter" 
     conf = {"hbase.zookeeper.quorum": host, 
       "hbase.mapred.outputtable": table, 
       "mapreduce.outputformat.class": "org.apache.hadoop.hbase.mapreduce.TableOutputFormat", 
       "mapreduce.job.output.key.class": "org.apache.hadoop.hbase.io.ImmutableBytesWritable", 
       "mapreduce.job.output.value.class": "org.apache.hadoop.io.Writable"} 
     rdd.saveAsNewAPIHadoopDataset(
      keyConverter=keyConv1, valueConverter=valueConv1,conf=conf) 


    hbaseRdd = hbaseRdd.map(lambda x: x[1]) # message_rdd = hbase_rdd.map(lambda x:x[0]) will give only row-key 

    processedRdd = hbaseRdd.map(lambda x: doYourStuff(x)) 
    save_record(processedRdd)