2015-12-08 12 views
0

Я пытаюсь получить доступ к значению аккумулятора в задаче кластера. Но когда я делаю так, он бросает исключение:Как получить доступ к значению аккумулятора в задачах?

не может прочитать значение аккумулятора в

Я пытался использовать row.localValue но он возвращает те же номера. Есть ли обходной путь?

private def modifyDataset(
    data: String, row: org.apache.spark.Accumulator[Int]): Array[Int] = { 

    var line = data.split(",") 
    var lineSize = line.size  
    var pairArray = new Array[Int](lineSize-1) 
    var a = row.value 
    paiArray(0)=a 

    row+=1 
    pairArray 

} 


var sc = Spark_Context.InitializeSpark 
var row = sc.accumulator(1, "Rows") 

var dataset = sc.textFile("path") 

var pairInfoFile = noHeaderRdd.flatMap{ data => modifyDataset(data,row) } 
    .persist(StorageLevel.MEMORY_AND_DISK)   
pairInfoFile.count() 

ответ

0

Это просто невозможно, и нет обходной путь. Искры accumulators являются переменными только для записи с точки зрения рабочего. Любая попытка прочитать его значение во время задачи не имеет смысла, поскольку между работниками и общим значением аккумулятора нет общего состояния, которое отражает только состояние текущего раздела.

Вообще говоря, accumulators предназначены в основном для диагностики и не должны использоваться в качестве части логики приложения. При использовании внутри преобразований единственная гарантия, которую вы получаете, - это как минимум однократное выполнение.

См. Также: How to print accumulator variable from within task (seem to "work" without calling value method)?