2014-09-23 2 views
0

Независимо от того, насколько я могу сделать compareTo моего сложного ключа, я не получаю ожидаемых результатов. За исключением случаев, когда я использую один ключ, который будет одинаковым для каждой записи, он будет соответствующим образом уменьшен до одной записи. Я также видел, что это происходит только тогда, когда я обрабатываю полную нагрузку, если я отламываю несколько записей, которые не уменьшали и запускали их в гораздо меньшем масштабе, эти записи объединяются.Map-Уменьшить не уменьшая, как ожидалось, со сложными ключами и значениями

Сумма выходных записей верна, но есть дублирование на уровне записи элементов, которые я ожидал бы вместе. Поэтому, когда я ожидаю, что 500 записей суммируются до 5000, я получаю 1232 записи суммирования до 5000 с очевидными записями, которые должны были быть сведены в один.

Я читал о проблемах с объектными ссылками и сложными ключами и значениями, но я не вижу нигде, что у меня есть потенциал для этого. Для этого вы найдете места, в которых я создаю новые объекты, которые мне, вероятно, не нужны, но я пытаюсь все на этом этапе и набираю его, когда он работает.

У меня нет идей о том, что попробовать или где и как тыкать, чтобы понять это. Пожалуйста помоги!

public static class Map extends 
     Mapper<LongWritable, Text, IMSTranOut, IMSTranSums> { 

    //private SimpleDateFormat dtFormat = new SimpleDateFormat("yyyyddd"); 

    @Override 
    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException { 
     String line = value.toString(); 
     SimpleDateFormat dtFormat = new SimpleDateFormat("yyyyddd"); 

     IMSTranOut dbKey = new IMSTranOut(); 
     IMSTranSums sumVals = new IMSTranSums(); 

     String[] tokens = line.split(",", -1); 

     dbKey.setLoadKey(-99); 
     dbKey.setTranClassKey(-99); 

     dbKey.setTransactionCode(tokens[0]); 
     dbKey.setTransactionType(tokens[1]); 
     dbKey.setNpaNxx(getNPA(dbKey.getTransactionCode())); 

     try { 
      dbKey.setTranDate(new Date(dtFormat.parse(tokens[2]).getTime())); 
     } catch (ParseException e) { 

     }// 2 

     dbKey.setTranHour(getTranHour(tokens[3])); 

     try { 
      dbKey.setStartDate(new Date(dtFormat.parse(tokens[4]).getTime())); 
     } catch (ParseException e) { 
     }// 4 

     dbKey.setStartHour(getTranHour(tokens[5])); 

     try { 
      dbKey.setStopDate(new Date(dtFormat.parse(tokens[6]).getTime())); 
     } catch (ParseException e) { 
     }// 6 

     dbKey.setStopHour(getTranHour(tokens[7])); 

     sumVals.setTranCount(1); 
     sumVals.setInputQTime(Double.parseDouble(tokens[8])); 
     sumVals.setElapsedTime(Double.parseDouble(tokens[9])); 
     sumVals.setCpuTime(Double.parseDouble(tokens[10])); 

     context.write(dbKey, sumVals); 
    } 

} 

public static class Reduce extends 
     Reducer<IMSTranOut, IMSTranSums, IMSTranOut, IMSTranSums> { 

    @Override 
    public void reduce(IMSTranOut key, Iterable<IMSTranSums> values, 
      Context context) throws IOException, InterruptedException { 

     int tranCount = 0; 
     double inputQ = 0; 
     double elapsed = 0; 
     double cpu = 0; 

     for (IMSTranSums val : values) { 
      tranCount += val.getTranCount(); 
      inputQ += val.getInputQTime(); 
      elapsed += val.getElapsedTime(); 
      cpu += val.getCpuTime(); 
     } 

     IMSTranSums sumVals=new IMSTranSums(); 
     IMSTranOut dbKey=new IMSTranOut(); 

     sumVals.setCpuTime(inputQ); 
     sumVals.setElapsedTime(elapsed); 
     sumVals.setInputQTime(cpu); 
     sumVals.setTranCount(tranCount); 

     dbKey.setLoadKey(key.getLoadKey()); 
     dbKey.setTranClassKey(key.getTranClassKey()); 
     dbKey.setNpaNxx(key.getNpaNxx()); 
     dbKey.setTransactionCode(key.getTransactionCode()); 
     dbKey.setTransactionType(key.getTransactionType()); 
     dbKey.setTranDate(key.getTranDate()); 
     dbKey.setTranHour(key.getTranHour()); 
     dbKey.setStartDate(key.getStartDate()); 
     dbKey.setStartHour(key.getStartHour()); 
     dbKey.setStopDate(key.getStopDate()); 
     dbKey.setStopHour(key.getStopHour()); 

     dbKey.setInputQTime(inputQ); 
     dbKey.setElapsedTime(elapsed); 
     dbKey.setCpuTime(cpu); 
     dbKey.setTranCount(tranCount); 

     context.write(dbKey, sumVals); 
    } 
} 

Вот реализация DBWritable класса:

public class IMSTranOut implements DBWritable, 
    WritableComparable<IMSTranOut> { 

private int loadKey; 
private int tranClassKey; 
private String npaNxx; 
private String transactionCode; 
private String transactionType; 
private Date tranDate; 
private double tranHour; 
private Date startDate; 
private double startHour; 
private Date stopDate; 
private double stopHour; 
private double inputQTime; 
private double elapsedTime; 
private double cpuTime; 
private int tranCount; 

public void readFields(ResultSet rs) throws SQLException { 
    setLoadKey(rs.getInt("LOAD_KEY")); 
    setTranClassKey(rs.getInt("TRAN_CLASS_KEY")); 
    setNpaNxx(rs.getString("NPA_NXX")); 
    setTransactionCode(rs.getString("TRANSACTION_CODE")); 
    setTransactionType(rs.getString("TRANSACTION_TYPE")); 
    setTranDate(rs.getDate("TRAN_DATE")); 
    setTranHour(rs.getInt("TRAN_HOUR")); 
    setStartDate(rs.getDate("START_DATE")); 
    setStartHour(rs.getInt("START_HOUR")); 
    setStopDate(rs.getDate("STOP_DATE")); 
    setStopHour(rs.getInt("STOP_HOUR")); 
    setInputQTime(rs.getInt("INPUT_Q_TIME")); 
    setElapsedTime(rs.getInt("ELAPSED_TIME")); 
    setCpuTime(rs.getInt("CPU_TIME")); 
    setTranCount(rs.getInt("TRAN_COUNT")); 
} 

public void write(PreparedStatement ps) throws SQLException { 
    ps.setInt(1, loadKey); 
    ps.setInt(2, tranClassKey); 
    ps.setString(3, npaNxx); 
    ps.setString(4, transactionCode); 
    ps.setString(5, transactionType); 
    ps.setDate(6, tranDate); 
    ps.setDouble(7, tranHour); 
    ps.setDate(8, startDate); 
    ps.setDouble(9, startHour); 
    ps.setDate(10, stopDate); 
    ps.setDouble(11, stopHour); 
    ps.setDouble(12, inputQTime); 
    ps.setDouble(13, elapsedTime); 
    ps.setDouble(14, cpuTime); 
    ps.setInt(15, tranCount); 
} 

public int getLoadKey() { 
    return loadKey; 
} 

public void setLoadKey(int loadKey) { 
    this.loadKey = loadKey; 
} 

public int getTranClassKey() { 
    return tranClassKey; 
} 

public void setTranClassKey(int tranClassKey) { 
    this.tranClassKey = tranClassKey; 
} 

public String getNpaNxx() { 
    return npaNxx; 
} 

public void setNpaNxx(String npaNxx) { 
    this.npaNxx = new String(npaNxx); 
} 

public String getTransactionCode() { 
    return transactionCode; 
} 

public void setTransactionCode(String transactionCode) { 
    this.transactionCode = new String(transactionCode); 
} 

public String getTransactionType() { 
    return transactionType; 
} 

public void setTransactionType(String transactionType) { 
    this.transactionType = new String(transactionType); 
} 

public Date getTranDate() { 
    return tranDate; 
} 

public void setTranDate(Date tranDate) { 
    this.tranDate = new Date(tranDate.getTime()); 
} 

public double getTranHour() { 
    return tranHour; 
} 

public void setTranHour(double tranHour) { 
    this.tranHour = tranHour; 
} 

public Date getStartDate() { 
    return startDate; 
} 

public void setStartDate(Date startDate) { 
    this.startDate = new Date(startDate.getTime()); 
} 

public double getStartHour() { 
    return startHour; 
} 

public void setStartHour(double startHour) { 
    this.startHour = startHour; 
} 

public Date getStopDate() { 
    return stopDate; 
} 

public void setStopDate(Date stopDate) { 
    this.stopDate = new Date(stopDate.getTime()); 
} 

public double getStopHour() { 
    return stopHour; 
} 

public void setStopHour(double stopHour) { 
    this.stopHour = stopHour; 
} 

public double getInputQTime() { 
    return inputQTime; 
} 

public void setInputQTime(double inputQTime) { 
    this.inputQTime = inputQTime; 
} 

public double getElapsedTime() { 
    return elapsedTime; 
} 

public void setElapsedTime(double elapsedTime) { 
    this.elapsedTime = elapsedTime; 
} 

public double getCpuTime() { 
    return cpuTime; 
} 

public void setCpuTime(double cpuTime) { 
    this.cpuTime = cpuTime; 
} 

public int getTranCount() { 
    return tranCount; 
} 

public void setTranCount(int tranCount) { 
    this.tranCount = tranCount; 
} 

public void readFields(DataInput input) throws IOException { 
    setNpaNxx(input.readUTF()); 
    setTransactionCode(input.readUTF()); 
    setTransactionType(input.readUTF()); 
    setTranDate(new Date(input.readLong())); 
    setStartDate(new Date(input.readLong())); 
    setStopDate(new Date(input.readLong())); 
    setLoadKey(input.readInt()); 
    setTranClassKey(input.readInt()); 
    setTranHour(input.readDouble()); 
    setStartHour(input.readDouble()); 
    setStopHour(input.readDouble()); 
    setInputQTime(input.readDouble()); 
    setElapsedTime(input.readDouble()); 
    setCpuTime(input.readDouble()); 
    setTranCount(input.readInt()); 
} 

public void write(DataOutput output) throws IOException { 
    output.writeUTF(npaNxx); 
    output.writeUTF(transactionCode); 
    output.writeUTF(transactionType); 
    output.writeLong(tranDate.getTime()); 
    output.writeLong(startDate.getTime()); 
    output.writeLong(stopDate.getTime()); 
    output.writeInt(loadKey); 
    output.writeInt(tranClassKey); 
    output.writeDouble(tranHour); 
    output.writeDouble(startHour); 
    output.writeDouble(stopHour); 
    output.writeDouble(inputQTime); 
    output.writeDouble(elapsedTime); 
    output.writeDouble(cpuTime); 
    output.writeInt(tranCount); 
} 

public int compareTo(IMSTranOut o) { 

    return (Integer.compare(loadKey, o.getLoadKey()) == 0 
      && Integer.compare(tranClassKey, o.getTranClassKey()) == 0 
      && npaNxx.compareTo(o.getNpaNxx()) == 0 
      && transactionCode.compareTo(o.getTransactionCode()) == 0 
      && (transactionType.compareTo(o.getTransactionType()) == 0) 
      && tranDate.compareTo(o.getTranDate()) == 0 
      && Double.compare(tranHour, o.getTranHour()) == 0 
      && startDate.compareTo(o.getStartDate()) == 0 
      && Double.compare(startHour, o.getStartHour()) == 0 
      && stopDate.compareTo(o.getStopDate()) == 0 
      && Double.compare(stopHour, o.getStopHour()) == 0) ? 0 : 1; 

} 
} 

Реализация перезаписываемого класса для комплексных значений:

public class IMSTranSums 
implements Writable{ 


     private double inputQTime; 
     private double elapsedTime; 
     private double cpuTime; 
     private int tranCount; 

     public double getInputQTime() { 
      return inputQTime; 
     } 
     public void setInputQTime(double inputQTime) { 
      this.inputQTime = inputQTime; 
     } 
     public double getElapsedTime() { 
      return elapsedTime; 
     } 
     public void setElapsedTime(double elapsedTime) { 
      this.elapsedTime = elapsedTime; 
     } 
     public double getCpuTime() { 
      return cpuTime; 
     } 
     public void setCpuTime(double cpuTime) { 
      this.cpuTime = cpuTime; 
     } 
     public int getTranCount() { 
      return tranCount; 
     } 
     public void setTranCount(int tranCount) { 
      this.tranCount = tranCount; 
     } 

     public void write(DataOutput output) throws IOException { 
      output.writeDouble(inputQTime); 
      output.writeDouble(elapsedTime); 
      output.writeDouble(cpuTime); 
      output.writeInt(tranCount); 
     } 
     public void readFields(DataInput input) throws IOException { 
      inputQTime=input.readDouble(); 
      elapsedTime=input.readDouble(); 
      cpuTime=input.readDouble(); 
      tranCount=input.readInt(); 

     } 
} 
+0

Ваш «compareTo» ошибочен, он полностью сработает алгоритм сортировки, потому что вы вообще ничего не заказываете. Вы когда-нибудь думали об использовании «CompareToBuilder» из сообщества или «ComparisonChain» из guava? –

+1

Так очевидно! Спасибо! Я неправильно понял использование compareTo и думал об этом как о проверке равенства. Теперь работая над решением, отчитается. –

+0

Он работал как шарм! Еще раз спасибо. Как я могу отблагодарить вас за решение? –

ответ

0

Вашего compareTo недостатков, это будет полностью провалить алгоритм сортировки, потому что вы, кажется, нарушаете переходность в своем заказе.

Я бы порекомендовал вам использовать CompareToBuilder от Apache Commons или ComparisonChain от Guava, чтобы ваши сравнения были более читабельными (и исправлены!).