2017-01-24 11 views
0

Я пытаюсь запустить кластер EMR с простым выполнением шага Spark, и я столкнулся с ошибкой, которую я не могу решить. Программа работает, когда я запускаю ее локально в Eclipse, но не тогда, когда я запускаю ее в кластере EMR. Программа просто пытается конвертировать CSV-файл на S3 в формат Паркета.Ошибка с spark-csv на Amazon EMR Cluster

Когда я бег в ОМ, я получаю следующее сообщение об ошибке:

Caused by: com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:

у меня нет ни одного поля над 1000000 пределом. Я пробовал читать из s3, s3n и s3a.

import org.apache.spark.SparkSession 
    import org.apache.spark.sql.types._ 

    object TestEMR { 
     def main(args: Array[String]) { 
     val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate() 
     val schema = StructType(
      Array(
       StructField("field1", StringType ,nullable = true), 
       StructField("field2", IntegerType ,nullable = true), 
       StructField("field3", IntegerType ,nullable = true), 
       StructField("field4", TimestampType ,nullable = true), 
       StructField("field5", TimestampType ,nullable = true), 
       StructField("field6", StringType ,nullable = true), 
       StructField("field7", StringType ,nullable = true), 
       StructField("field8", StringType ,nullable = true), 
       StructField("field9", StringType ,nullable = true), 
       StructField("field10", StringType ,nullable = true), 
       StructField("field11", StringType ,nullable = true), 
       StructField("field12", StringType ,nullable = true), 
       StructField("field13", StringType ,nullable = true), 
       StructField("field14", StringType ,nullable = true), 
       StructField("field15", StringType ,nullable = true), 
       StructField("field16", StringType ,nullable = true), 
       StructField("field17", StringType ,nullable = true), 
       StructField("field18", StringType ,nullable = true), 
       StructField("field19", StringType ,nullable = true), 
       StructField("field20", StringType ,nullable = true) 
      ) 
     ) 

     val df = spark.read 
      .format("com.databricks.spark.csv") 
      .schema(schema) 
      .option("nullValue","") 
      .option("treatEmptyValuesAsNulls","true") 
      .load("s3://mybucket/input/myfile.csv") 
     df.write.mode("append").parquet("s3://mybucket/output/myfile") 
     spark.stop 
     } 
    } 

ответ

0

кажется, что это не найдя конец строки, так говорится в непрерывно, пока не достигнет этого предела 10K символов в одной строке.

Как говорится: проверка перевода строки файла

+0

Файл замечательный. Операция загрузки просто не разбивает файл на новую строку. Мне удалось преобразовать код в файл sc.texfile (myfile), и он отлично прочитал файл. –

+0

интересный. FWIW Spark 2 имеет встроенный синтаксический анализатор CSV, что означает, что вы получаете файлы JIRA против команды искры. Интегрированные тесты s3a, которые я запускаю, работают с файлом .csv.gz на s3 с этим модулем –

0

Этот вопрос остается открытым в spark-csv jira. Они предоставили обходной путь, например, используя открытый синтаксический анализатор csv, если у вас нет данных или чтения в качестве RDD, а затем создается dataframe.

val rdd = sc.textFile("file.csv") 
// Here, filtering or transformation 
//val filteredRDD = rdd.filter.. 
//val transformedRDD = rdd.map.. 

val df = new CsvParser().csvRdd(sqlContext, transformedRDD) 

 Смежные вопросы

  • Нет связанных вопросов^_^