2016-09-19 9 views
1

Моя цель - загрузить большой R data.frame в Spark. Размер data.frame составляет 5 мил. строк и 7 столбцов различных типов. После загрузки в R этот файл данных занимает ок. 200 мб памяти. Однако, когда я пытаюсь загрузить его в Spark с помощью функции as.DataFrame(), сеанс R занят навсегда, он работает в течение 1 часа, и мне пришлось отменить операцию.Как загрузить большие R data.frames в Spark, используя SparkR as.DataFrame()?

Вот подробности:

Я создаю следующий набор данных для использования в этом примере:

n=5e6 # set sample size 

d <- data.frame(
    v1=base::sample(1:9,n,replace=TRUE), 
    v2=base::sample(1000:9000,n,replace=TRUE), 
    v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)], 
    v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
    v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
    v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)], 
    v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)] 
) 

выше создает образце data.frame

Размер, ок 200MB:

paste0("size: ", round(as.numeric(object.size(d))/1000000,1)," mb") 

Далее я создаю сессию искры:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop') 
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths())) 
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"') 

library(SparkR) 
library(rJava) 
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin")) 

Теперь я пытаюсь загрузить data.frame я создал выше в Спарк:

d_sd <- as.DataFrame(d) 

Эта команда принимает навсегда для запуска.

Есть ли что-то не так, что я делаю? Может ли это быть связано с классом() столбцов в моем исходном R-файле.frame? Должен ли я использовать альтернативный подход для загрузки больших наборов данных из R в Spark? Если да, не стесняйтесь что-то предлагать.

Заранее спасибо.

PS:

Я могу быстро конвертировать и обрабатывать небольшие наборы данных в Спарк с помощью этого метода.

Вот некоторая справочная информация о моей R сессии и OS я бегу:

R версия 3.2.5 (2016-04-14) Платформа: x86_64-w64-mingw32/x64 (64-разрядная версия) Работает под: Windows 7 x64 (сборка 7601) Пакет обновления 1

Я запускаю версию Microsoft R (Revolution) под Windows 7 Professional (64 бит), 8 ГБ ОЗУ. Процессор: i5-2520M @ 2.50GHz


EDIT 2016-09-19:

Спасибо, Zeydy Ortiz и Мохит Бансал. На основании ваших ответов, я попытался следующий, но я по-прежнему сталкиваюсь с тем же вопросом:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop') 
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths())) 
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"') 

library(SparkR) 
library(rJava) 
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin")) 


n=5e6 # set sample size 

d_sd <- createDataFrame(sqlContext,data=data.frame(
     v1=base::sample(1:9,n,replace=TRUE), 
     v2=base::sample(1000:9000,n,replace=TRUE), 
     v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)], 
     v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
     v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
     v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)], 
     v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)] 
    )) 

командой, которая преобразует R ФР Спарк ДФ был запущен в течение нескольких часов. Пришлось отменить. Пожалуйста помогите.


РЕДАКТИРОВАТЬ 2016-12-14:

Выше была сделана попытка с помощью Спарк 1.6.1 и 3.2.0 R. Я недавно попробовал это, используя Spark 2.0.2 (последний) и R 3.2.5, и я столкнулся с той же проблемой.

Любая помощь была бы принята с благодарностью.

ответ

2

Это связано с ограничениями памяти, почему вы должны сначала создать базовый Dataframe и преобразовать его в Spark DataFrame?

Вы можете комбинировать оба шага в один и получить результаты:

Sys.setenv(SPARK_HOME='C:\\soft\\spark-2.0.0-bin-hadoop2.7',HADOOP_HOME='C:\\soft\\hadoop') 
.libPaths(c(file.path(Sys.getenv('SPARK_HOME'), 'R', 'lib'),.libPaths())) 
Sys.setenv('SPARKR_SUBMIT_ARGS'='"sparkr-shell"') 

library(SparkR) 
library(rJava) 
sparkR.session(enableHiveSupport = FALSE,master = "local[*]", sparkConfig = list(spark.driver.memory = "1g",spark.sql.warehouse.dir="C:\\soft\\hadoop\\bin")) 

Затем вы можете загрузить свой SDF:

n=5e6 # set sample size 

d_sd <- as.DataFrame(data.frame(
    v1=base::sample(1:9,n,replace=TRUE), 
    v2=base::sample(1000:9000,n,replace=TRUE), 
    v3=seq(as.Date("2016-01-01"), as.Date("2016-12-31"), by = "day")[base::sample(1:365,n,replace=TRUE)], 
    v4=LETTERS[base::sample(1:length(LETTERS),n,replace=TRUE)], 
    v5=base::sample(1000:9000,n,replace=TRUE)/1000, 
    v6=seq(ISOdate(2016,1,1), ISOdate(2018,1,1), "sec")[base::sample(1:63158401,n,replace=TRUE)], 
    v7=c(TRUE,FALSE)[base::sample(1:2,n,replace=TRUE)] 
)) 

Вы также можете сослаться на подобный вопрос: How best to handle converting a large local data frame to a SparkR data frame?

+0

Спасибо, Мохит Бансал, я начал новую сессию R, и попробовал ваш подход, это не помогло. R работает в течение нескольких минут. –

+0

Как можно манипулировать большими наборами данных в SparkR? –

+0

@ KirillSavine следуйте следующему вопросу: http://stackoverflow.com/questions/39392327/how-best-to-handle-converting-a-large-local-data-frame-to-a-sparkr-data-frame –

0

В Spark 2.0.0 используйте createDataFrame(d)

+0

Спасибо, Зейди Ортис, я пробовал ваш подход. Функция createDataFrame() также чрезвычайно медленна, используя данные в моем вопросе. Он работает уже некоторое время. Никогда не закончил. –