2016-10-27 6 views
4

Я пытаюсь преобразовать удаленную таблицу MySQL в файл паркета, используя искру 1.6.2.Исключение ошибки при преобразовании таблицы MySQL в паркет

Процесс проходит в течение 10 минут, заполняя память, чем начинается с этими сообщениями:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval 

в конце терпит неудачу с этой ошибкой:

ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem] 
java.lang.OutOfMemoryError: GC overhead limit exceeded 

Я бег его в искровой оболочки с этими командами:

spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G 

val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load() 

dataframe_mysql.saveAsParquetFile("name.parquet") 

У меня есть ограничения на максимальную память исполнителя до 12G. Есть ли способ заставить писать паркетный файл в «маленьких» кусках, освобождая память?

+0

Можете ли вы уточнить конфигурацию вашего кластера? – eliasah

+0

Это незаурядный несчастный –

+0

И каков размер ваших данных? Любой столбец раздела? – eliasah

ответ

3

Похоже, проблема заключалась в том, что при чтении данных с разъемом jdbc у вас не было раздела.

Чтение из JDBC по умолчанию не распространяется, поэтому для включения распространения вы должны установить ручное разбиение на разделы. Вам нужен столбец, который является хорошим ключом разметки, и вам нужно знать распределение вверх.

Это то, что выглядит как ваши данные, по-видимому:

root 
|-- id: long (nullable = false) 
|-- order_year: string (nullable = false) 
|-- order_number: string (nullable = false) 
|-- row_number: integer (nullable = false) 
|-- product_code: string (nullable = false) 
|-- name: string (nullable = false) 
|-- quantity: integer (nullable = false) 
|-- price: double (nullable = false) 
|-- price_vat: double (nullable = false) 
|-- created_at: timestamp (nullable = true) 
|-- updated_at: timestamp (nullable = true) 

order_year казалось хорошим кандидатом для меня. (Вы, кажется, ~ 20 лет в соответствии с вашими комментариями)

import org.apache.spark.sql.SQLContext 

val sqlContext: SQLContext = ??? 

val driver: String = ??? 
val connectionUrl: String = ??? 
val query: String = ??? 
val userName: String = ??? 
val password: String = ??? 

// Manual partitioning 
val partitionColumn: String = "order_year" 

val options: Map[String, String] = Map("driver" -> driver, 
    "url" -> connectionUrl, 
    "dbtable" -> query, 
    "user" -> userName, 
    "password" -> password, 
    "partitionColumn" -> partitionColumn, 
    "lowerBound" -> "0", 
    "upperBound" -> "3000", 
    "numPartitions" -> "300" 
) 

val df = sqlContext.read.format("jdbc").options(options).load() 

PS:partitionColumn, lowerBound, upperBound, numPartitions: Эти параметры должны все быть указаны, если любой из них указан.

Теперь вы можете сохранить DataFrame в паркет.