0

Добрый вечер.Проблема с меткой времени Java в Scala - java.lang.IllegalArgumentException Получена ошибка

Я выполняю сравнительную работу по работе с RDD, Dataframes и наборами данных в Spark 2.1.0 (используя встроенную Scala 2.11.8). Я загрузил некоторые свободно доступные данные из https://data.london.gov.uk/dataset/smartmeter-energy-use-data-in-london-households и выполнил сценарий позже, как показано на нем. Чтобы дать Вам предварительный просмотр, опрашиваемых данных выглядит следующим образом:

LCLid,stdorToU,DateTime,KWH/hh (per half hour) ,Acorn,Acorn_grouped 
MAC000002,Std,2012-10-12 00:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 01:00:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 01:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 02:00:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 02:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 03:00:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 03:30:00.0000000, 0 ,ACORN-A,Affluent 
MAC000002,Std,2012-10-12 04:00:00.0000000, 0 ,ACORN-A,Affluent 

Чтобы достичь своей сравнительной работы, я раз искру на различных этапах импорта и преобразования [String, String, Отметка, Double, String, String] из 6 переменных, выраженных выше. Мне удалось сопоставить данные в Dataframe и Dataset, но не может достичь такого же результата с точки зрения RDD. Everytime я пытаюсь преобразовать файл в РДУ, я получаю следующее сообщение об ошибке:

ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 3) 
java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] 

Я очень смущен, так как переменная «DateTime» уже выражается в формате временной метки в «гггг-мм-дд чч: мм: сс [.fffffffff]». Я прочитал сообщения, такие как эти (Convert Date to Timestamp in Scala, How to convert unix timestamp to date in Spark, Spark SQL: parse timestamp without seconds), но не удовлетворяют мои потребности.

Это еще более запутанно, поскольку определенный класс 'londonDataSchemaDS', который я построил, работает в моем преобразовании Dataset, но не на моем RDD.

Это сценарий, я использовал:

import java.io.File 
import java.sql.Timestamp 

import org.apache.spark.sql.SparkSession 
import org.apache.spark.sql.types.{DataTypes, StructField, StructType} 

val sparkSession = SparkSession.builder.appName("SmartData London").master("local[*]").getOrCreate() 

val LCLid = StructField("LCLid", DataTypes.StringType) 
val stdorToU = StructField("stdorToU", DataTypes.StringType) 
val DateTime = StructField("DateTime", DataTypes.TimestampType) 
val KWHhh = StructField("KWH/hh (per half hour) ", DataTypes.DoubleType) 
val Acorn = StructField("Acorn", DataTypes.StringType) 
val Acorn_grouped = StructField("Acorn_grouped", DataTypes.StringType) 

val fields = Array(LCLid,stdorToU,DateTime,KWHhh,Acorn,Acorn_grouped) 
val londonDataSchemaDF = StructType(fields) 

import sparkSession.implicits._ 

case class londonDataSchemaDS(LCLid: String, stdorToU: String, DateTime: java.sql.Timestamp, KWHhh: Double, Acorn: String, Acorn_grouped: String) 

val t0 = System.nanoTime() 

val loadFileRDD=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv") 
.map(_.split(",")) 
.map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).toDouble, r(4), r(5))) 

val t1 = System.nanoTime() 

val loadFileDF=sparkSession.read.schema(londonDataSchemaDF).option("header", true) 
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv") 

val t2=System.nanoTime() 

val loadFileDS=sparkSession.read.option("header", "true") 
.csv("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv") 
.withColumn("DateTime", $"DateTime".cast("timestamp")) 
.withColumnRenamed("KWH/hh (per half hour) ", "KWHhh") 
.withColumn("KWHhh", $"KWHhh".cast("double")) 
.as[londonDataSchemaDS] 

val t3 = System.nanoTime() 

loadFileRDD.take(10) 

loadFileDF.show(10, false) 
loadFileDF.printSchema() 

loadFileDS.show(10, false) 
loadFileDS.printSchema() 

println("Time Elapsed to implement RDD: " + (t1 - t0) * 1E-9 + " seconds") 
println("Time Elapsed to implement DataFrame: " + (t2 - t1) * 1E-9 + " seconds") 
println("Time Elapsed to implement Dataset: " + (t3 - t2) * 1E-9 + " seconds") 

Любая помощь по этому вопросу будет наиболее высокую оценку и/или толчок в правильном направлении.

Большое спасибо,

Christian

ответ

0

Я знаю, что я сделал неправильно. Я был настолько погружен в преобразование DataFrame и Dataset, у которого есть встроенная функция, чтобы пропустить заголовок, что я забыл удалить заголовок из процесса преобразования RDD.

Добавляя строки ниже, я удалить заголовок и успешно преобразовать мой файл CSV в РД (Это объясняет, почему я получаю ошибку форматирования в Timestamp):

val loadFileRDDwH=sparkSession.sparkContext.textFile("C:/Data/Smart_Data_London/Power-Networks-LCL-June2015(withAcornGps).csv_Pieces/Power-Networks-LCL-June2015(withAcornGps)v2_1.csv").map(_.split(",")) 

val header=loadFileRDDwH.first() 

val loadFileRDD=loadFileRDDwH.filter(_(0) != header(0)).map(r=>londonDataSchemaDS(r(0), r(1), Timestamp.valueOf(r(2)), r(3).split("\\s+").mkString.toDouble, r(4), r(5))) 

Спасибо за чтение

Christian