У меня есть рекурсивная функция, которая должна сравнивать результаты текущего вызова с предыдущим вызовом, чтобы выяснить, достигли ли они конвергенции. Моя функция не содержит action
- она содержит только map
, flatMap
и reduceByKey
. Поскольку Spark не оценивает преобразования (пока не будет вызвано действие), моя следующая итерация не получит правильные значения для сравнения для конвергенции.Spark - как обращаться с ленивой оценкой в случае итеративных (или рекурсивных) вызовов функций
Вот скелет функции -
def func1(sc: SparkContext, nodes:RDD[List[Long]], didConverge: Boolean, changeCount: Int) RDD[(Long] = {
if (didConverge)
nodes
else {
val currChangeCount = sc.accumulator(0, "xyz")
val newNodes = performSomeOps(nodes, currChangeCount) // does a few map/flatMap/reduceByKey operations
if (currChangeCount.value == changeCount) {
func1(sc, newNodes, true, currChangeCount.value)
} else {
func1(sc, newNode, false, currChangeCount.value)
}
}
}
performSomeOps
содержит только map
, flatMap
и reduceByKey
преобразования. Поскольку он не имеет никаких действий, код в performSomeOps
не выполняется. Таким образом, мой currChangeCount
не получает фактического счета. Из чего следует, что условие проверки конвергенции (currChangeCount.value == changeCount
) будет недействительным. Одним из способов преодоления является принудительное действие в каждой итерации путем вызова count
, но это лишние накладные расходы.
Мне интересно, что я могу сделать, чтобы заставить действие без накладных расходов или есть другой способ решить эту проблему?
Хорошие очки! Я должен был заметить ограничения на использование аккумулятора. Кроме того, я изменил свой 'performSomeOps', чтобы включить сокращение (для вычисления changeCount). Сейчас он работает нормально. Спасибо за помощь. –