2013-03-07 2 views
0

`общественный класс GenericUdafMemberLevel реализует GenericUDAFResolver2 { частный статический окончательный Log LOG = LogFactory .getLog (GenericUdafMemberLevel.class.getName());Развитие Hive UDAF встретить ClassCastException без идеи

@Override 
public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo paramInfo) 
     throws SemanticException { 
    return new GenericUdafMeberLevelEvaluator(); 
} 

@Override 
//参数校验 
public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) 
     throws SemanticException { 
    if (parameters.length != 2) {//参数大小 
     throw new UDFArgumentTypeException(parameters.length - 1, 
       "Exactly two arguments are expected."); 
    } 
    //参数必须是原型,即不能是 
    if (parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE) { 
     throw new UDFArgumentTypeException(0, 
       "Only primitive type arguments are accepted but " 
         + parameters[0].getTypeName() + " is passed."); 
    } 

    if (parameters[1].getCategory() != ObjectInspector.Category.PRIMITIVE) { 
     throw new UDFArgumentTypeException(1, 
       "Only primitive type arguments are accepted but " 
         + parameters[1].getTypeName() + " is passed."); 
    } 

    return new GenericUdafMeberLevelEvaluator(); 
} 

public static class GenericUdafMeberLevelEvaluator extends GenericUDAFEvaluator { 
    private PrimitiveObjectInspector inputOI; 
    private PrimitiveObjectInspector inputOI2; 
    private DoubleWritable result; 

    @Override 
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) 
      throws HiveException { 
     super.init(m, parameters); 
     if (m == Mode.PARTIAL1 || m == Mode.COMPLETE){ 
      inputOI = (PrimitiveObjectInspector) parameters[0]; 
      inputOI2 = (PrimitiveObjectInspector) parameters[1]; 
      result = new DoubleWritable(0); 
     } 
     return PrimitiveObjectInspectorFactory.writableLongObjectInspector; 
    } 

    /** class for storing count value. */ 
    static class SumAgg implements AggregationBuffer { 
     boolean empty; 
     double value; 
    } 

    @Override 
    //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。 
    //使用buffer对象前,先进行内存的清空——reset 
    public AggregationBuffer getNewAggregationBuffer() throws HiveException { 
     SumAgg buffer = new SumAgg(); 
     reset(buffer); 
     return buffer; 
    } 

    @Override 
    //重置为0 
    //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。 
    public void reset(AggregationBuffer agg) throws HiveException { 
     ((SumAgg) agg).value = 0.0; 
     ((SumAgg) agg).empty = true; 
    } 

    private boolean warned = false; 
    //迭代 
    //map阶段调用,只要把保存当前和的对象agg,再加上输入的参数,就可以了。 
    @Override 
    public void iterate(AggregationBuffer agg, Object[] parameters) 
      throws HiveException { 
     // parameters == null means the input table/split is empty 
     if (parameters == null) { 
      return; 
     } 
     try { 
      double flag = PrimitiveObjectInspectorUtils.getDouble(parameters[1], inputOI2); 
      if(flag > 1.0) //参数条件 
       merge(agg, parameters[0]); //这里将Map之后的操作,放入combiner进行合并 
      } catch (NumberFormatException e) { 
      if (!warned) { 
       warned = true; 
       LOG.warn(getClass().getSimpleName() + " " 
        + StringUtils.stringifyException(e)); 
      } 
      } 

    } 

    @Override 
    //combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。 
    public void merge(AggregationBuffer agg, Object partial) 
      throws HiveException { 
     if (partial != null) { 
      //通过ObejctInspector取每一个字段的数据 
      double p = PrimitiveObjectInspectorUtils.getDouble(partial, inputOI); 
      ((SumAgg) agg).value += p; 
     } 
    } 


    @Override 
    //reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。 
    public Object terminatePartial(AggregationBuffer agg) 
      throws HiveException { 
     return terminate(agg); 
    } 

    @Override 
    public Object terminate(AggregationBuffer agg) throws HiveException { 
     result.set(((SumAgg) agg).value); 
     return result; 
    } 
} 

} `

Я использовал некоторые китайские комментировать код для понимания теории. На самом деле идея UDAF выглядит следующим образом: select test_sum (col1, col2) от tbl; , если col2 удовлетворяет некоторому условию, затем суммирует значение col1. Большая часть кода копируется из официальной функции avg() udaf.

я встретил weried Exception: java.lang.RuntimeException: Hive Runtime Error while closing operators at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:226) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:436) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:372) at org.apache.hadoop.mapred.Child$4.run(Child.java:255) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1136) at org.apache.hadoop.mapred.Child.main(Child.java:249) Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1132) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:558) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.Operator.close(Operator.java:567) at org.apache.hadoop.hive.ql.exec.ExecMapper.close(ExecMapper.java:193) ... 8 more Caused by: java.lang.ClassCastException: org.apache.hadoop.io.DoubleWritable cannot be cast to org.apache.hadoop.io.LongWritable at org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableLongObjectInspector.get(WritableLongObjectInspector.java:35) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:323) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serializeStruct(LazyBinarySerDe.java:255) at org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.serialize(LazyBinarySerDe.java:202) at org.apache.hadoop.hive.ql.exec.ReduceSinkOperator.processOp(ReduceSinkOperator.java:236) at org.apache.hadoop.hive.ql.exec.Operator.process(Operator.java:474) at org.apache.hadoop.hive.ql.exec.Operator.forward(Operator.java:800) at org.apache.hadoop.hive.ql.exec.GroupByOperator.forward(GroupByOperator.java:1061) at org.apache.hadoop.hive.ql.exec.GroupByOperator.closeOp(GroupByOperator.java:1113) ... 13 more

Am У меня есть что-то неправильно с моей UDAF ?? пожалуйста добросердечно указывать. Спасибо lllllllot.

ответ

0

Замените PrimitiveObjectInspectorFactory.writableLongObjectInspector в методе init с помощью PrimitiveObjectInspectorFactory.writableDoubleObjectInspector.

+0

Я уже установил typo.But у меня есть исключение NullPointerException при выполнении UDAF. Можете ли вы привести пример о UDAF в части init() – user2114243