0

У меня есть рекурсивная функция, которая должна сравнивать результаты текущего вызова с предыдущим вызовом, чтобы выяснить, достигли ли они конвергенции. Моя функция не содержит action - она ​​содержит только map, flatMap и reduceByKey. Поскольку Spark не оценивает преобразования (пока не будет вызвано действие), моя следующая итерация не получит правильные значения для сравнения для конвергенции.Spark - как обращаться с ленивой оценкой в ​​случае итеративных (или рекурсивных) вызовов функций

Вот скелет функции -

def func1(sc: SparkContext, nodes:RDD[List[Long]], didConverge: Boolean, changeCount: Int) RDD[(Long] = { 

    if (didConverge) 
     nodes 
    else { 
     val currChangeCount = sc.accumulator(0, "xyz")   
     val newNodes = performSomeOps(nodes, currChangeCount) // does a few map/flatMap/reduceByKey operations 
     if (currChangeCount.value == changeCount) { 
      func1(sc, newNodes, true, currChangeCount.value) 
     } else { 
      func1(sc, newNode, false, currChangeCount.value) 
     } 
    } 
} 

performSomeOps содержит только map, flatMap и reduceByKey преобразования. Поскольку он не имеет никаких действий, код в performSomeOps не выполняется. Таким образом, мой currChangeCount не получает фактического счета. Из чего следует, что условие проверки конвергенции (currChangeCount.value == changeCount) будет недействительным. Одним из способов преодоления является принудительное действие в каждой итерации путем вызова count, но это лишние накладные расходы.

Мне интересно, что я могу сделать, чтобы заставить действие без накладных расходов или есть другой способ решить эту проблему?

ответ

1

Я считаю, что это очень important thing вы упускаете здесь:

Для обновления аккумулятора выполненных внутри действий только, искровые гарантирует, что обновление каждой задачи к аккумулятору будет применяться только один раз, т.е. перезапущен задачи не будут обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может применяться несколько раз, если задачи или этапы работы будут повторно выполняться.

Из-за этого аккумуляторы не могут быть надежно использованы для управления потоком управления и лучше подходят для мониторинга работы.

Кроме того, выполнение действия не является ненужными накладными расходами. Если вы хотите знать, что является результатом вычисления, вы должны выполнить его. Если, конечно, результат тривиален. Самое дешевое возможное действие:

rdd.foreach { case _ => } 

, но он не будет устранять проблему, возникшую здесь.

В общих итеративных вычислений в Спарк может быть структурирована следующим образом:

def func1(chcekpoinInterval: Int)(sc: SparkContext, nodes:RDD[List[Long]], 
    didConverge: Boolean, changeCount: Int, iteration: Int) RDD[(Long] = { 

    if (didConverge) nodes 
    else { 

    // Compute and cache new nodes 
    val newNodes = performSomeOps(nodes, currChangeCount).cache 

    // Periodically checkpoint to avoid stack overflow 
    if (iteration % checkpointInterval == 0) newNodes.checkpoint 

    /* Call a function which computes values 
    that determines control flow. This execute an action on newNodes. 
    */ 
    val changeCount = computeChangeCount(newNodes) 

    // Unpersist old nodes 
    nodes.unpersist 

    func1(checkpointInterval)(
     sc, newNodes, currChangeCount.value == changeCount, 
     currChangeCount.value, iteration + 1 
    ) 
    } 
} 
+0

Хорошие очки! Я должен был заметить ограничения на использование аккумулятора. Кроме того, я изменил свой 'performSomeOps', чтобы включить сокращение (для вычисления changeCount). Сейчас он работает нормально. Спасибо за помощь. –

0

Я вижу, что эти map/flatMap/reduceByKey преобразования обновляются аккумулятором. Поэтому единственный способ выполнить все обновления - выполнить все эти функции, а count - это самый простой способ добиться этого и дает наименьшие накладные расходы по сравнению с другими способами (cache + count, first или collect).

 Смежные вопросы

  • Нет связанных вопросов^_^