2015-10-02 5 views
8

Используя Spark 1.4.0, я пытаюсь вставить данные из Spark DataFrame в базу данных MemSQL (которая должна быть точно такой же, как взаимодействие с базой данных MySQL) с помощью insertIntoJdbc(). Однако я сохраняю исключение Runtime TableAlreadyExists.Spark DataFrame InsertIntoJDBC - TableAlreadyExists Exception

Сначала я создаю таблицу MemSQL так:

CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT); 

Затем я создаю простой dataframe в Спарк и попытаться вставить в MemSQL так:

val df = sc.parallelize(Array(123,234)).toDF.toDF("val") 
//df: org.apache.spark.sql.DataFrame = [val: int] 

df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false) 

java.lang.RuntimeException: Table table1 already exists. 

ответ

6

Это решение относится к общим соединений JDBC, хотя ответ на @wayne, вероятно, является лучшим решением для memSQL специально.

ВставитьIntoJdbc, кажется, устарели от 1.4.0, и используя его на самом деле вызывает write.jdbc().

write() возвращает объект DataFrameWriter. Если вы хотите добавить данные в свою таблицу, вам придется изменить режим сохранения объекта на "append".

Другая проблема с примером в вышеприведенном вопросе заключается в том, что схема DataFrame не соответствует схеме целевой таблицы.

В приведенном ниже коде приведен рабочий пример оболочки Spark. Я использую spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar, чтобы начать сеанс моей искровой оболочки.

import java.util.Properties 

val prop = new Properties() 
prop.put("user", "root") 
prop.put("password", "") 

val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val") 
val dfWriter = df.write.mode("append") 

dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop) 
+2

Привет, Локоть, я использую искру 1.5, и я все еще получаю таблицу уже существует исключение, даже после того, как вы сказали write.mode («append»), вам нравится комментировать это? В базе данных уже есть объект с именем «customer_spark» –

+0

Hey @DJElbow, то же самое здесь, все еще получение исключения «Таблица» table1 «уже существует». когда write.mode (SaveMode.Append). Я проверил, и при использовании пользователя «root» он отлично работает, но при использовании пользователя с привилегиями CREATE/INSERT/UPDATE я получаю эту ошибку. – marnun

3

insertIntoJDBC документы фактически неверно ; они говорят, что таблица уже должна существовать, но на самом деле, если это произойдет, это будет сгенерировано сообщение об ошибке, как вы можете видеть выше:

https://github.com/apache/spark/blob/03cca5dce2cd7618b5c0e33163efb8502415b06e/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L264

Мы рекомендуем использовать наш разъем MemSQL Спарк, который вы можете найти здесь:

https://github.com/memsql/memsql-spark-connector

Если включить эту библиотеку и импорта com.memsql.spark.connector._ в вашем коде, вы можете использовать df.saveToMemSQL (...), чтобы сохранить DataFrame в MemSQL. Вы можете найти документацию для нашего соединителя здесь:

http://memsql.github.io/memsql-spark-connector/latest/api/#com.memsql.spark.connector.DataFrameFunctions

+0

Очень приятно. Это упрощает ситуацию. Есть ли скомпилированная банка для загрузки где-нибудь? Не удалось найти его. – DJElbow

+1

Если вы добавите maven.memsql.com в качестве преобразователя, вы можете включить его как зависимость в свой проект: https://github.com/memsql/memsql-spark-connector#using –

1

У меня был такой же выпуск. Обновление искровой версии до 1.6.2 работало нормально