Я пытаюсь запустить кластер EMR с простым выполнением шага Spark, и я столкнулся с ошибкой, которую я не могу решить. Программа работает, когда я запускаю ее локально в Eclipse, но не тогда, когда я запускаю ее в кластере EMR. Программа просто пытается конвертировать CSV-файл на S3 в формат Паркета.Ошибка с spark-csv на Amazon EMR Cluster
Когда я бег в ОМ, я получаю следующее сообщение об ошибке:
Caused by: com.univocity.parsers.common.TextParsingException: Length of parsed input (1000001) exceeds the maximum number of characters defined in your parser settings (1000000). Identified line separator characters in the parsed content. This may be the cause of the error. The line separator in your parser settings is set to '\n'. Parsed content:
у меня нет ни одного поля над 1000000 пределом. Я пробовал читать из s3, s3n и s3a.
import org.apache.spark.SparkSession
import org.apache.spark.sql.types._
object TestEMR {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("Spark Convert to Parquet").getOrCreate()
val schema = StructType(
Array(
StructField("field1", StringType ,nullable = true),
StructField("field2", IntegerType ,nullable = true),
StructField("field3", IntegerType ,nullable = true),
StructField("field4", TimestampType ,nullable = true),
StructField("field5", TimestampType ,nullable = true),
StructField("field6", StringType ,nullable = true),
StructField("field7", StringType ,nullable = true),
StructField("field8", StringType ,nullable = true),
StructField("field9", StringType ,nullable = true),
StructField("field10", StringType ,nullable = true),
StructField("field11", StringType ,nullable = true),
StructField("field12", StringType ,nullable = true),
StructField("field13", StringType ,nullable = true),
StructField("field14", StringType ,nullable = true),
StructField("field15", StringType ,nullable = true),
StructField("field16", StringType ,nullable = true),
StructField("field17", StringType ,nullable = true),
StructField("field18", StringType ,nullable = true),
StructField("field19", StringType ,nullable = true),
StructField("field20", StringType ,nullable = true)
)
)
val df = spark.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("nullValue","")
.option("treatEmptyValuesAsNulls","true")
.load("s3://mybucket/input/myfile.csv")
df.write.mode("append").parquet("s3://mybucket/output/myfile")
spark.stop
}
}
Файл замечательный. Операция загрузки просто не разбивает файл на новую строку. Мне удалось преобразовать код в файл sc.texfile (myfile), и он отлично прочитал файл. –
интересный. FWIW Spark 2 имеет встроенный синтаксический анализатор CSV, что означает, что вы получаете файлы JIRA против команды искры. Интегрированные тесты s3a, которые я запускаю, работают с файлом .csv.gz на s3 с этим модулем –