2017-02-01 7 views
0

У меня есть текстовый файл, который делится на ~, мне нужно выполнить синтаксический анализ перед преобразованием в dataframe. Код читается в текстовом файле, так как RDD [String] выполняет синтаксический анализ. Затем он преобразуется в RDD [Row]. Затем с помощью схемы создается dataframe.Преобразование DataFrame Spark Scala

Итак, ниже приведен следующий код. Он работает, но проблема заключается в том, что фактическая схема составляет 400 полей. Мне было интересно, есть ли более простой способ, чем набирать атрибуты (1), атрибуты (2), атрибуты (3) ... так далее.

Я в настоящее время на Искры 1.6. CDH 5.2.2

Пример ввода:

20161481132310 ~  ~"This" is a comma 10 

Текущий код:

val schema_1 = StructType(Array(
StructField("EXAMPLE_1", StringType, true), 
StructField("EXAMPLE_2", StringType, true), 
StructField("EXAMPLE_3", StringType, true))) 

val rdd = sc.textFile("example.txt") 
val rdd_truncate = rdd.map(_.split("~").map(_.trim).mkString("~")) 
val row_final = rdd_truncate 
    .map(_.split("~")) 
    .map(attributes => Row(attributes(0), 
    attributes(1), 
    attributes(2))) 

val df = sqlContext.createDataFrame(row_final, schema_1) 

на основе внушения я модифицированном для следующих. Он работает, за исключением кавычек. «Это» на входе не удастся. Какие-либо предложения?

val df = sqlContext.read 
     .format("com.databricks.spark.csv") 
     .option("delimiter","~") 
     .schema(schema) 
     .load("example.txt") 
val df_final = df.select(df.columns.map(c =>trim(col(c)).alias(c)): _*) 

ответ

3

Просто используйте стандартный CSV читателя:

spark.read.schema(schema).option("delimiter", "~").csv("example.txt") 

Если вы хотите обрезать поля просто использовать select:

import org.apache.spark.sql.functions.{col, trim} 

df.select(df.columns.map(c => trim(col(c)).alias(c)): _*) 

Если вы используете Спарк 1.x вы можете использовать spark-csv:

sqlContext.read 
    .format("com.databricks.spark.csv") 
    .schema(schema) 
    .option("delimiter", "~") 
    .load("example.txt") 

Если этого не достаточно, вы можете использовать Row.fromSeq:

Row.fromSeq(line.split("~").take(3))