2015-07-10 3 views
0

Я написал программу mapreduce, которая считывает данные из таблицы hive используя HCATLOG и записывает в HBase. Это работа с картой, без редукторов. Я запускал программу из командной строки, и она работает так, как ожидалось (создала толстую банку, чтобы избежать проблем с Jar). Я хотел интегрировать его в oozie (с помощью HUE). У меня есть два варианта, чтобы запустить егоMapreduce с интеграцией HCATALOG с oozie в MAPR

  1. Использование MapReduce Действие
  2. Использование Java Действие

Поскольку моя MapReduce программа имеет метод драйвера, который содержит код, приведенный ниже

import org.apache.hadoop.conf.*; 
import org.apache.hadoop.io.*; 
import org.apache.hadoop.mapreduce.*; 
import org.apache.hadoop.util.*; 
import org.apache.hadoop.fs.*; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hive.hcatalog.data.schema.HCatSchema; 
import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; 
import org.apache.hive.hcatalog.mapreduce.HCatOutputFormat; 

public class HBaseValdiateInsertDriver { 

public static void main(String[] args) throws Exception { 

    String dbName = "Test"; 
    String tableName = "emp"; 
    Configuration conf = new Configuration(); 

    args = new GenericOptionsParser(conf, args).getRemainingArgs(); 

    Job job = new Job(conf, "HBase Get Put Demo"); 
    job.setInputFormatClass(HCatInputFormat.class); 
    HCatInputFormat.setInput(job, dbName, tableName, null); 

    job.setJarByClass(HBaseValdiateInsertDriver.class); 

    job.setMapperClass(HBaseValdiateInsert.class); 
    job.setMapOutputKeyClass(Text.class); 
    job.setMapOutputValueClass(Text.class); 

    job.setNumReduceTasks(0); 
    FileInputFormat.addInputPath(job, new Path("maprfs:///user/input")); 
    FileOutputFormat.setOutputPath(job, new Path("maprfs:///user/output")); 

    job.waitForCompletion(true); 

    } 
} 

Как сделать Я указываю метод драйвера в oozie. Все, что я вижу, это указать класс сопоставления и редуктора. Может кто-нибудь мне подскажет, как установить свойства?

Использование java-действия Я могу указать свой класс драйвера как основной класс и выполнить его, но я сталкиваюсь с ошибками, такими как таблица не найдена, баны HCATLOG не найдены и т. Д. В рабочем процессе я включаю hive-site.xml (Использование Hue), но я чувствую, что система не может забрать свойства. Может ли кто-нибудь сообщить мне, что мне нужно позаботиться, есть ли какие-либо другие свойства конфигурации, которые мне нужно включить?

Также пример программы я ссылался в веб-сайт Cloudera использует

HCatInputFormat.setInput(job, InputJobInfo.create(dbName, 
       inputTableName, null)); 

, где, как я использую ниже (я не вижу способа, которые принимают вышеуказанную вход

HCatInputFormat.setInput(job, dbName, tableName, null); 

Ниже мой картографа код

import java.io.IOException; 
import java.util.ArrayList; 
import java.util.List; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.hbase.KeyValue; 
import org.apache.hadoop.hbase.client.Durability; 
import org.apache.hadoop.hbase.client.Get; 
import org.apache.hadoop.hbase.client.HTableInterface; 
import org.apache.hadoop.hbase.client.Put; 
import org.apache.hadoop.hbase.client.Result; 
import org.apache.hadoop.hbase.util.Bytes; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.io.WritableComparable; 
import org.apache.hadoop.mapreduce.Mapper; 
import org.apache.hive.hcatalog.data.HCatRecord; 



public class HBaseValdiateInsert extends Mapper<WritableComparable, HCatRecord, Text, Text> { 

    static HTableInterface table; 
    static HTableInterface inserted; 
    private String hbaseDate = null; 
    String existigValue=null; 
    List<Put> putList = new ArrayList<Put>(); 


    @Override 
    public void setup(Context context) throws IOException { 

       Configuration conf = context.getConfiguration(); 
       String tablename = "dev_arch186"; 
     Utils.getHBConnection(); 
     table = Utils.getTable(tablename); 
     table.setAutoFlushTo(false); 
    } 
    @Override 
    public void cleanup(Context context) { 
     try { 
      table.put(putList); 
      table.flushCommits(); 
      table.close(); 
     } catch (IOException e) { 

      e.printStackTrace(); 
     } 
     Utils.closeConnection(); 
    } 



    @Override 
    public void map(WritableComparable key, HCatRecord value, Context context) throws IOException, InterruptedException { 

       String name_hive = (String) value.get(0); 
       String id_hive = (String) value.get(1); 

       String rec[] = test.toString().split(","); 
     Get g = new Get(Bytes.toBytes(name_hive)); 

     existigValue=getOneRecord(Bytes.toBytes("Info"),Bytes.toBytes("name"),name_hive); 
     if (existigValue.equalsIgnoreCase("NA") || !existigValue.equalsIgnoreCase(id_hive)) { 
      Put put = new Put(Bytes.toBytes(rec[0])); 
      put.add(Bytes.toBytes("Info"), 
        Bytes.toBytes("name"), 
        Bytes.toBytes(rec[1])); 
      put.setDurability(Durability.SKIP_WAL); 
      putList.add(put); 
      if(putList.size()>25000){ 
       table.put(putList); 
       table.flushCommits(); 
      } 
     } 

    } 


    public String getOneRecord(byte[] columnFamily, byte[] columnQualifier, String rowKey) 
      throws IOException { 
     Get get = new Get(rowKey.getBytes()); 
     get.setMaxVersions(1); 
     Result rs = table.get(get); 
     rs.getColumn(columnFamily, columnQualifier); 
     System.out.println(rs.containsColumn(columnFamily, columnQualifier)); 
     KeyValue result = rs.getColumnLatest(columnFamily,columnQualifier); 

     if (rs.containsColumn(columnFamily, columnQualifier)) 
      return (Bytes.toString(result.getValue())); 
     else 
      return "NA"; 
    } 

    public boolean columnQualifierExists(String tableName, String ColumnFamily, 
      String ColumnQualifier, String rowKey) throws IOException { 
     Get get = new Get(rowKey.getBytes()); 
     Result rs = table.get(get); 
     return(rs.containsColumn(ColumnFamily.getBytes(),ColumnQualifier.getBytes())); 
    } 

} 

Примечание: Я использую MapR (M3) Кластер с HUE, как го e для oozie. Hive Версия: 1-0 HCAT Версия: 1-0

ответ

0

Я не мог найти способ инициализировать HCatInputFormat из действия Oozie mapreduce. Но у меня есть обходное решение, как показано ниже.

Создан LazyHCatInputFormat, расширяя HCatInputFormat. Переопределите метод getJobInfo для обработки инициализации. Это будет вызвано как часть вызова getSplits (..).

private static void lazyInit(Configuration conf){ 
    try{ 

     if(conf==null){ 
      conf = new Configuration(false); 
     } 
     conf.addResource(new Path(System.getProperty("oozie.action.conf.xml"))); 
     conf.addResource(new org.apache.hadoop.fs.Path("hive-config.xml")); 

     String databaseName = conf.get("LazyHCatInputFormat.databaseName"); 
     String tableName = conf.get("LazyHCatInputFormat.tableName"); 
     String partitionFilter = conf.get("LazyHCatInputFormat.partitionFilter"); 

     setInput(conf, databaseName, tableName); 
     //setFilter(partitionFilter); 

     //System.out.println("After lazyinit : "+conf.get("mapreduce.lib.hcat.job.info")); 
    }catch(Exception e){ 
     System.out.println("*** LAZY INIT FAILED ***"); 
     //e.printStackTrace(); 
    } 
} 

public static InputJobInfo getJobInfo(Configuration conf) 
     throws IOException { 
    String jobString = conf.get("mapreduce.lib.hcat.job.info"); 
    if (jobString == null) { 
     lazyInit(conf); 
     jobString = conf.get("mapreduce.lib.hcat.job.info"); 
     if(jobString == null){ 
      throw new IOException("job information not found in JobContext. HCatInputFormat.setInput() not called?"); 
     } 
    } 
    return (InputJobInfo) HCatUtil.deserialize(jobString); 
} 

В действии oozie map-redcue, сконфигурированном как показано ниже.

   <property> 
       <name>mapreduce.job.inputformat.class</name> 
       <value>com.xyz.LazyHCatInputFormat</value> 
      </property> 
      <property> 
       <name>LazyHCatInputFormat.databaseName</name> 
       <value>HCAT DatabaseNameHere</value> 
      </property> 
      <property> 
       <name>LazyHCatInputFormat.tableName</name> 
       <value>HCAT TableNameHere</value> 
      </property> 

Это может быть не лучшая реализация, а быстрый взлом, чтобы заставить его работать.