2017-01-23 6 views
2

В реализации Java/Scala/Python Spark можно просто вызвать метод foreachRDD или DataFrame, чтобы распараллелить итерации по набору данных.SparkR foreach loop

В SparkR Я не могу найти такую ​​инструкцию. Каким будет правильный способ перебора строк из строки DataFrame?

Я мог найти только функции gapply и dapply, но я не хочу вычислять новые значения столбца, просто хочу сделать что-то, взяв один элемент из списка параллельно.

Моя предыдущая попытка была с lapply

inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "") 
createOrReplaceTempView(inputDF,'inputData') 

distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData') 

collected <- collect(distinctM)[[1]] 

problemSolver <- function(idM) { 
    filteredDF <- filter(inputDF, inputDF$ID_M == idM) 
} 

spark.lapply(c(collected), problemSolver) 

, но я получаю эту ошибку:

Error in handleErrors(returnStatus, conn) : 
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with 
Error in callJMethod([email protected], "col", c) : 
    Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed. 
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod 

Что бы решение предусмотрено R для решения таких проблем?

ответ

3

У меня была аналогичная проблема. Сбор DataFrame помещает его в R в качестве фрейма данных. Оттуда вы можете получить в каждой строке, как обычно, в обычном старом R. На мой взгляд, это ужасный мотив для обработки данных, поскольку вы теряете параллельную обработку, которую обеспечивает Spark. Вместо сбора данных и последующей фильтрации используйте встроенные функции SparkR, select, filter и т. Д. Если вы хотите делать рядовые операторы, встроенные функции SparkR обычно будут делать это за вас, в противном случае я нашел selectExpr или expr, чтобы быть очень полезен, когда исходные функции Spark предназначены для работы с одним значением (думаю: from_unix_timestamp)

Таким образом, чтобы получить то, что вы хотите, я хотел бы попробовать что-то вроде этого (я на SparkR 2.0+):

Frist чтения в данных, как вы сделали:

inputDF<- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "") 

Затем сделайте это RDD: inputSparkDF<- SparkR::createDataFrame(inputDF)

Далее выделить только отдельные/уникальные значения (я использую magrittr для трубопроводов (работает в SparkR)):

distinctSparkDF<- SparkR::select(inputSparkDF) %>% SparkR::distinct() 

Отсюда, вы можете применить фильтр еще при жизни в мире Спарк в:

filteredSparkDF<- SparkR::filter(distinctSparkDF, distinctSparkDF$variable == "value")

После Спарк фильтруется, что данные для вас, то имеет смысл собрать подмножество в базовой R как последний шаг в технологическом процессе:

myRegularRDataframe<- SparkR::collect(filteredSparkDF)

Надеюсь, это поможет. Удачи. --nate