В реализации Java/Scala/Python Spark можно просто вызвать метод foreach
RDD
или DataFrame
, чтобы распараллелить итерации по набору данных.SparkR foreach loop
В SparkR Я не могу найти такую инструкцию. Каким будет правильный способ перебора строк из строки DataFrame
?
Я мог найти только функции gapply
и dapply
, но я не хочу вычислять новые значения столбца, просто хочу сделать что-то, взяв один элемент из списка параллельно.
Моя предыдущая попытка была с lapply
inputDF <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "")
createOrReplaceTempView(inputDF,'inputData')
distinctM <- sql('SELECT DISTINCT(ID_M) FROM inputData')
collected <- collect(distinctM)[[1]]
problemSolver <- function(idM) {
filteredDF <- filter(inputDF, inputDF$ID_M == idM)
}
spark.lapply(c(collected), problemSolver)
, но я получаю эту ошибку:
Error in handleErrors(returnStatus, conn) :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 5.0 failed 1 times, most recent failure: Lost task 1.0 in stage 5.0 (TID 207, localhost, executor driver): org.apache.spark.SparkException: R computation failed with
Error in callJMethod([email protected], "col", c) :
Invalid jobj 3. If SparkR was restarted, Spark operations need to be re-executed.
Calls: compute ... filter -> $ -> $ -> getColumn -> column -> callJMethod
Что бы решение предусмотрено R для решения таких проблем?