2016-12-05 13 views
1

Мне нужно создать паркетный файл из CSV-файлов с помощью настраиваемого файл JSON схемы, как это:NumberFormatException, когда я пытаюсь создать паркетный файл с пользовательской схемой и типы BigDecimal

{"type" : "struct","fields" : [ {"name" : "tenor_bank","type" : "string","nullable" : false}, {"name":"tenor_frtb", "type":"string", "nullable":false}, {"name":"weight", "type":"decimal(25,5)", "nullable":false} ]} 

Пожалуйста, примите посмотрите на поле с именем weight.

Вот как это выглядит файл ввода CSV:

tenor_1;tenor_2;weight 
1D;3M;1 
7D;3M;1 
1W;3M;1 
1OD;3M;1 
14D;3M;1 
2W;3M;1 
21D;3M;1 
3W;3M;1 
28D;3M;1 
30D;3M;1 
1M;3M;1 
56D;3M;1 
60D;3M;1 
2M;3M;1 
61D;3M;1 
84D;3M;1 
90D;3M;1 
3M;3M;1 
91D;3M;1 
92D;3M;1 
112D;3M;0.8333 
112D;6M;0.1667 

Это, как я загружаю JSon файл схемы с его DataFrame:

val path: Path = new Path(mra_schema_parquet) 
    val fileSystem = path.getFileSystem(sc.hadoopConfiguration) 

    val inputStream: FSDataInputStream = fileSystem.open(path) 

    val schema_json = Stream.cons(inputStream.readLine(), Stream.continually(inputStream.readLine)) 

    logger.debug("schema_json looks like " + schema_json.head) 

    val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType] 

    logger.debug("mySchemaStructType is " + mySchemaStructType) 

    myDF = loadCSV(sqlContext, path_input_csv,separator,mySchemaStructType ,header) 
    logger.debug("myDF.schema.json looks like " + myDF.schema.json) 
    inputStream.close() 

    //finally I create the parquet file. This line provokes the NuumberFormatException, concretely the line with .parquet(pathParquet) 

    writeDataFrame2Parquet(myDF, path_output_parquet, saveMode,header,separator) 

//some utilities 
def loadCSV(sqlContext : SQLContext, pathCSV: String, separator: String, customSchema: StructType, haveSchema: String): DataFrame = { 

logger.info("loadCSV. header is " + haveSchema.toString + ", inferSchema is false pathCSV is " + pathCSV + " separator is " + separator) 

sqlContext.read 
    .format("com.databricks.spark.csv") 
    .option("header", haveSchema) // Use first line of all files as header 
    .option("delimiter", separator) 
    .option("nullValue","") 
    //Esto provoca que pete en runtime si encuentra un fallo en la línea que esté parseando 
    .option("mode","FAILFAST") 
    .schema(customSchema) 
    .load(pathCSV) 

}

def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = { 

df.write 
    .format("com.databricks.spark.csv") 
    .option("header", header) 
    .option("delimiter",delimiter) 
    .option("nullValue","") 
    .mode(saveMode) 
    .parquet(pathParquet) 

}

Когда экс ecution достигает последнюю строку, .parquet (pathParquet), исключение происходит:

Caused by: org.apache.spark.SparkException: Task failed while writing rows. 
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:88) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 
**Caused by: java.lang.NumberFormatException** 
at java.math.BigDecimal.<init>(BigDecimal.java:545) 
at java.math.BigDecimal.<init>(BigDecimal.java:739) 
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:68) 
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:121) 
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:108) 
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) 
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:240) 
... 8 more 

Похоже, когда искровой CSV пытается визуализировать поле «веса» в виде десятичной дроби (25,5), библиотеки аварий , Кто-нибудь может мне помочь?

спасибо.

ответ

1

Просто замените запятые с точками: 0,8333 к 0.8333 Потому что, как вы можете видеть:

scala> BigDecimal("0.8333") 
res16: scala.math.BigDecimal = 0.8333 

scala> BigDecimal("0,8333") 
java.lang.NumberFormatException 
    at java.math.BigDecimal.<init>(BigDecimal.java:494) 
    at java.math.BigDecimal.<init>(BigDecimal.java:383) 
    at java.math.BigDecimal.<init>(BigDecimal.java:806) 
    at scala.math.BigDecimal$.exact(BigDecimal.scala:125) 
    at scala.math.BigDecimal$.apply(BigDecimal.scala:283) 
    ... 33 elided 
+0

спасибо за помощь @ipoteka, но это не сработало. После того как я изменил файл csv, произойдет одно и то же исключение. Как создать файл паркета, начинающийся с файла csv с помощью поля BigDecimal внутри него, когда я загружаю этот файл паркета с фреймом данных, и это поле является BigDecimal? – aironman

+0

@aironman Strange. И в основном вам лучше иметь файлы в правильном формате, а затем создавать и отображать некоторые вещи. – ipoteka

+0

kudos для вас @ipoteka. После того, как я внимательно рассмотрел файл csv ввода, я заметил пустое поле, когда файл схемы json говорит, что значение nullable является ложным в этом поле. arggg! – aironman

 Смежные вопросы

  • Нет связанных вопросов^_^