2017-01-31 65 views
3

У меня есть данные в базе данных, и я хочу работать с ним в Spark, используя sparklyr.Передача данных из базы данных в Spark с использованием sparklyr

я могу использовать DBI -На пакет, чтобы импортировать данные из базы данных в R

dbconn <- dbConnect(<some connection args>) 
data_in_r <- dbReadTable(dbconn, "a table") 

затем скопировать данные из R Спарк с использованием

sconn <- spark_connect(<some connection args>) 
data_ptr <- copy_to(sconn, data_in_r) 

Копирование дважды медленно для больших наборов данных.

Как я могу скопировать данные непосредственно из базы данных в Spark?

sparklyr имеет несколько параметров для импорта, но ничего не найдено. sdf_import() выглядит как возможность, но неясно, как использовать его в этом контексте.

ответ

4

Sparklyr> = 0.6.0

Вы можете использовать spark_read_jdbc.

Sparklyr < 0.6.0

Я надеюсь, что есть более элегантное решение там, но здесь минимальный пример использования низкоуровневого API:

  • Убедитесь, что искра имеет доступ к требуемый драйвер JDBC, например, добавив свои координаты в spark.jars.packages. Например, с PostgreSQL (Adjust для текущей версии) можно добавить:

    spark.jars.packages org.postgresql:postgresql:9.4.1212 
    

    в SPARK_HOME/conf/spark-defaults.conf

  • Загрузить данные и зарегистрироваться в качестве временного вид:

    name <- "foo" 
    
    spark_session(sc) %>% 
        invoke("read") %>% 
        # JDBC URL and table name 
        invoke("option", "url", "jdbc:postgresql://host/database") %>% 
        invoke("option", "dbtable", "table") %>% 
        # Add optional credentials 
        invoke("option", "user", "scott") %>% 
        invoke("option", "password", "tiger") %>% 
        # Driver class, here for PostgreSQL 
        invoke("option", "driver", "org.postgresql.Driver") %>% 
        # Read and register as a temporary view 
        invoke("format", "jdbc") %>% 
        invoke("load") %>% 
        # Spark 2.x, registerTempTable in 1.x 
        invoke("createOrReplaceTempView", name) 
    

    Вы можете передать несколько options сразу с использованием environment:

    invoke("options", as.environment(list(
        user="scott", password="tiger", url="jdbc:..." 
    ))) 
    
  • нагрузки временный вид с dplyr:

    dplyr::tbl(sc, name) 
    
  • Обязательно прочитайте о дальнейших JDBC вариантов, с акцентом на partitionColumn, *Bound и numPartitions.

  • Для получения дополнительной информации смотрите, например How to use JDBC source to write and read data in (Py)Spark? и How to improve performance for slow Spark jobs using DataFrame and JDBC connection?