2015-07-20 1 views
1

У меня есть набор данных (input.csv), содержащий 35 столбцов (0-34 позиции). Если я запустил программу MRv2, я получаю «ArrayIndexOutOfBoundException».MapReduce job не работает по полным данным

Но если я запустил программу на снимке набора данных, содержащем те же столбцы, то он успешно работает.

Ошибка

15/07/20 11:05:55 INFO mapreduce.Job: Task Id : attempt_1437379028043_0018_m_000000_2, Status : FAILED 
Error: java.lang.ArrayIndexOutOfBoundsException: 34 
    at lotus.staging.StageMapper.map(StageMapper.java:88) 
    at lotus.staging.StageMapper.map(StageMapper.java:1) 
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145) 
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:784) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341) 
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:415) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628) 
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

StageMapper

package lotus.staging; 

import java.io.IOException; 
import java.text.DateFormat; 
import java.text.ParseException; 
import java.text.SimpleDateFormat; 
import java.util.Date; 

import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 

public class StageMapper extends Mapper<LongWritable, Text, Text, Text> { 

@Override 
public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException { 
    String[] record = value.toString().split(","); 

    // Key 
    String stg_table = null; 

    String report_code = record[0].trim(); 
    String product_type_description = null; 
    String principal_amount = record[1]; 
    String funded = record[2].trim(); 
    String facility_id = record[3]; 
    String loan_id = record[4]; 

    // Start Date 
    String start_date = record[5]; 

    // Maturity Date 
    String end_date = record[6]; 

    DateFormat df = new SimpleDateFormat("MM/dd/yyyy"); 
    Date startDate; 
    Date endDate; 
    long diff; 
    long diffDays = 0l; 

    try { 
     startDate = df.parse(start_date); 
     endDate = df.parse(end_date); 
     df.format(startDate); 
     df.format(endDate); 
     diff = endDate.getTime() - startDate.getTime(); 
     diffDays = diff/(24 * 60 * 60 * 1000); 
    } catch (ParseException e) { 
     e.printStackTrace(); 
    } 

    // Date Diff 
    String date_diff = String.valueOf(diffDays); 

    String next_reset_date = record[7]; 
    String interest_rate = record[8]; 
    String base_interest_rate = record[9]; 
    String counterparty_industry_id = record[10]; 
    String industry_name = record[11]; 
    String counterparty_id = record[12]; 
    String counterparty_name = record[13]; 

    // Bank Number 
    String vehicle_code = record[14]; 

    String vehicle_description = record[15]; 

    // Branch Number 
    String cost_center_code = record[16]; 

    String branch_borrower_name = record[17]; 
    String igl_code = record[20]; 

    // Participation Bal Begin Month 
    String participated_amt = record[21]; 

    String sys_id = record[23]; 

    // Loan To Value 
    String ltv = record[26]; 

    String accrual_status = record[27]; 
    String country_code = record[30]; 
    String fiscal_year = record[31]; 
    String accounting_period = record[32]; 
    String accounting_day = record[33]; 
    String control_category = record[34]; 

    // CONTROL_CATEGORY_DESC, Secred_BY_Re 

    if (report_code.equalsIgnoreCase("1")) { 
     product_type_description = "Commercial_Loan"; 
     stg_table = "stg_lon"; 
    } else if (report_code.equalsIgnoreCase("2")) { 
     product_type_description = "Mortgage_Loan"; 
     stg_table = "stg_mgt"; 
    } else if (report_code.equalsIgnoreCase("3")) { 
     product_type_description = "Installment_Loan"; 
     stg_table = "stg_lon"; 
    } else if (report_code.equalsIgnoreCase("4")) { 
     product_type_description = "Revolving Credit"; 
     stg_table = "stg_lon"; 
    } 

    // Value 
    String data = report_code + "," + product_type_description + "," 
      + principal_amount + "," + funded + "," + facility_id + "," 
      + loan_id + "," + start_date + "," + end_date + "," + date_diff 
      + "," + next_reset_date + "," + interest_rate + "," 
      + base_interest_rate + "," + counterparty_industry_id + "," 
      + industry_name + "," + counterparty_id + "," 
      + counterparty_name + "," + vehicle_code + "," 
      + vehicle_description + "," + cost_center_code + "," 
      + branch_borrower_name + "," + igl_code + "," 
      + participated_amt + "," + sys_id + "," + ltv + "," 
      + accrual_status + "," + country_code + "," + fiscal_year + "," 
      + accounting_period + "," + accounting_day + "," 
      + control_category; 

    context.write(new Text(stg_table), new Text(data)); 

} // map() ends 
} // Mapper ends 

StageReducer

package lotus.staging; 

import java.io.IOException; 

import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Reducer; 
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 

public class StageReducer extends Reducer<Text, Text, Text, Text> { 

    private MultipleOutputs mos; 

    @Override 
    protected void setup(Context context) throws IOException, 
      InterruptedException { 
     mos = new MultipleOutputs(context); 
    } 

    @Override 
    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException { 
     for (Text value : values) { 
      mos.write(key, value, key.toString()); 
     } 
    } 

    @Override 
    protected void cleanup(Context context) throws IOException, 
      InterruptedException { 
     mos.close(); 
    } 
} 

StageDriver

package lotus.staging; 

import java.io.IOException; 

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 

public class StageDriver { 
    // Main 
    public static void main(String[] args) throws IOException, 
      ClassNotFoundException, InterruptedException { 
     Configuration conf = new Configuration(); 
     Job job = Job.getInstance(conf, "StageDriver"); 

     // conf.set("mapreduce.textoutputformat.separator", ","); 
     // conf.set("mapreduce.output.textoutputformat.separator", ","); 
     //conf.set("mapreduce.output.key.field.separator", ","); 

     job.setJarByClass(StageDriver.class); 
     LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 


     // Mapper and Mapper-Output Key 
     job.setMapperClass(StageMapper.class); 
     job.setMapOutputKeyClass(Text.class); 
     conf.set("mapred.max.split.size", "1020"); 


     // Reducer and Output Key and Value 
     job.setReducerClass(StageReducer.class); 
     job.setOutputKeyClass(Text.class); 
     job.setOutputValueClass(Text.class); 

     // Input parameters to execute 
     FileInputFormat.setInputPaths(job, new Path(args[0])); 
     FileOutputFormat.setOutputPath(job, new Path(args[1])); 

     // deleting the output path automatically from hdfs so that we don't 
     // have delete it explicitly 

     // outputPath.getFileSystem(conf).delete(outputPath); 

     // exiting the job only if the flag value becomes false 

     System.exit(job.waitForCompletion(true) ? 0 : 1); 
    } 

} 

Ниже приведены наборы данных

Snapshot-DatasetComplete-Dataset

Пожалуйста помогите

ответ

1

Одна из строк в input.csv является неполной или содержит ошибку форматирования (неправильно экранированием) , Попытайтесь выяснить, какая строка это. Вы можете поймать исключение, где возникает эта ошибка, и распечатать номер строки и исправить ваши данные.

try { 
CODE WHERE THE OUTOFBOUNDS HAPPENS 
} 
catch (Exception e) { 

    LOG.warn(String.format("Invalid data in row: %d", row)); 
    System.out.println(String.format("Invalid data in row: %d", row)); 


} 

Так что в вашем случае это означает, что:

@Override 
public void map(LongWritable key, Text value, Context context) 
     throws IOException, InterruptedException { 
    String[] record = value.toString().split(","); 

    // Key 
    String stg_table = null; 
try{ 
    String report_code = record[0].trim(); 
    String product_type_description = null; 
    String principal_amount = record[1]; 
    String funded = record[2].trim(); 
    String facility_id = record[3]; 
    String loan_id = record[4]; 

    // Start Date 
    String start_date = record[5]; 

    // Maturity Date 
    String end_date = record[6]; 



DateFormat df = new SimpleDateFormat("MM/dd/yyyy"); 
Date startDate; 
Date endDate; 
long diff; 
long diffDays = 0l; 

try { 
    startDate = df.parse(start_date); 
    endDate = df.parse(end_date); 
    df.format(startDate); 
    df.format(endDate); 
    diff = endDate.getTime() - startDate.getTime(); 
    diffDays = diff/(24 * 60 * 60 * 1000); 
} catch (ParseException e) { 
    e.printStackTrace(); 
} 

// Date Diff 
String date_diff = String.valueOf(diffDays); 

String next_reset_date = record[7]; 
String interest_rate = record[8]; 
String base_interest_rate = record[9]; 
String counterparty_industry_id = record[10]; 
String industry_name = record[11]; 
String counterparty_id = record[12]; 
String counterparty_name = record[13]; 

// Bank Number 
String vehicle_code = record[14]; 

String vehicle_description = record[15]; 

// Branch Number 
String cost_center_code = record[16]; 

String branch_borrower_name = record[17]; 
String igl_code = record[20]; 

// Participation Bal Begin Month 
String participated_amt = record[21]; 

String sys_id = record[23]; 

// Loan To Value 
String ltv = record[26]; 

String accrual_status = record[27]; 
String country_code = record[30]; 
String fiscal_year = record[31]; 
String accounting_period = record[32]; 
String accounting_day = record[33]; 
String control_category = record[34]; 

} 
    catch (Exception e) { 
if {record.size() > 0} { 
    // LOG.warn(String.format("Invalid data in row: %s", record[0].trim())); 
    System.out.println(String.format("Invalid data in record id: %s", record[0].trim()));} 
else{ 
System.out.println("Empty Record Found"); 
} 
    return void; 
} 
... 

Я использую идентификатор записи, потому что у вас нет номера строки, но вы можете искать ваш, что для этого идентификатора записи , И предположительно есть, по крайней мере, первая запись в вашей записи. В противном случае вы также можете проверить, не является ли запись пустой.

+0

может предоставить фрагмент кода для него. – user3343543

+0

, вы должны дать трассировку, чтобы узнать, где именно происходит граница outof. Вот где вы хотите обработать исключение. –

+0

Я отредактировал этот вопрос, с ошибкой – user3343543