2014-12-08 6 views
2

мой аккумулятор - это массив [Array [Int]] после обновления накопителя при работе RDD, аккумулятор (0), как и ожидалось, где в качестве аккумулятора (1) находится массив (0,0,0), который полностью потерянЗначение искрового накопителя различается, если внутри RDD и снаружи RDD

внутри RDD, значение аккумулятора Array (Array (4,5,6), Array (4,5,6)) внешний RDD, значение аккумулятора Array (Array (4,5,6), массив (0,0,0))

ниже приведен код

import org.apache.spark.Accumulable 
import org.apache.spark.AccumulableParam 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object acc { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Simple Application") 
    val sc = new SparkContext(conf) 
    val a =Array(Array(1,2,3),Array(4,5,6)) 
    val rdd = sc.parallelize(a) 
    val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1)) 
    val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam) 
    rdd.foreach{x=> 
    accumulator += (x(0),0,0) 
    accumulator += (x(1),0,1) 
    accumulator += (x(2),0,2) 
    accumulator += (x(0),1,0) 
    accumulator += (x(1),1,1) 
    accumulator += (x(2),1,2) 
    println("accumulator value in rdd is"+accumulator.localValue) 
    } 

    println("accumulator value out of rdd is :" + accumulator.value) 

    } 

} 
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] { 

    def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = { 
    initialValue 
    } 

    def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = { 

    acc(value._2)(value._3) = value._1 
    acc 

    } 

    def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = { 
    val columnLength: Int = m1.length 
    val rowLength: Int = m1(0).length 
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength) 

    var j: Int = 0 
    while (j < columnLength) { 
     var i =0 
    while (i < rowLength) { 
     val a = Math.max(m1(j)(i), m2(j)(i)) 
     updatedMatrix(j)(i) = a 
     i += 1 
     } 
     j += 1 
    } 

    updatedMatrix 
     } 


} 

Результаты: внутри RDD, значение накопителя - Array (Array (4,5,6), Array (4,5,6)) вне RDD, значение накопителя - Array (Array (4,5,6), Array (0,0,0))

но то, что я ожидал вне РДА является Array (Array (4,5,6), массив (4,5,6))

+0

Эта проблема решена? –

ответ

3

addAccumulator вызывается метод всякий раз, когда в накопителе обновляется.

В приведенном выше накопителе кода + = (x (0), 0,0) используется метод addAccumulator.

после завершения всех задач addInPlace метод вызван для суммирования накопленных значений из всех задач.

В приведенном выше коде initialValue Array (1, 1, 1) Array (1, 1, 1) и задаче Аккумуляторное значение Array (4, 5, 6) Массив (4, 5, 6) вызывает метод addInPlace.

В приведенной выше переменной кода я в addInPlace метод должен быть сброшен, когда он входит в петлю время (J < columnLength) {

Следующий код работает как шарм.

  while (j < columnLength) { 
       i=0 
       while (i < rowLength) { 
        println("m1(j)(i)"+ m1(j)(i)) 
        println(" m2(j)(i))"+ m2(j)(i)) 
        val a = Math.max(m1(j)(i), m2(j)(i)) 
          updatedMatrix(j)(i) = a 
          i += 1 
       } 
       j += 1 
      } 
+0

Я нашел, что результат такой же – Tim

0

localValue должен быть разным, в соответствии с документом:

  • Это не глобальное значение аккумулятора. Чтобы получить глобальное значение после
  • завершена операция по набору данных, позвоните по телефону value. *
  • Типичное использование этого метода заключается в непосредственном мутировании локального значения, например, для добавления
  • элемент в набор. */
0

Я нашел это никакой разницы с модифицировать вар я = 0 до I = 0 и конечный результат является массив (Array (4,5,6), Array (4,5,6))

Результат приложения извлекается журналами пряжи -applicationId.

Код:

import org.apache.spark.Accumulable 
import org.apache.spark.AccumulableParam 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 

object acc { 
    def main(args: Array[String]) { 
    //val conf = new SparkConf().setAppName("Simple Application") 
    val conf = new SparkConf() 
    conf.setSparkHome("/usr/lib/spark") 
    conf.setAppName("Simple Application") 
    val sc = new SparkContext(conf) 
    val a =Array(Array(1,2,3),Array(4,5,6)) 
    val rdd = sc.parallelize(a) 
    val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1)) 
    val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam) 
    rdd.foreach{x=> 
    accumulator += (x(0),0,0) 
    accumulator += (x(1),0,1) 
    accumulator += (x(2),0,2) 
    accumulator += (x(0),1,0) 
    accumulator += (x(1),1,1) 
    accumulator += (x(2),1,2) 
    val columnLength: Int = accumulator.localValue.length 
    val rowLength: Int = accumulator.localValue(0).length 
    var j: Int = 0 
    var i: Int = 0 
    println("accumulator") 
    while(j < columnLength){ 
     i =0 
     while(i<rowLength){ 
      println(accumulator.localValue(j)(i)) 
      i += 1 
     } 
     j+=1 
    } 
    println("accumulator value in rdd is"+accumulator.localValue) 
    } 
    val columnLength: Int = accumulator.value.length 
    val rowLength: Int = accumulator.value(0).length 
    var j: Int = 0 
    var i: Int = 0 
    println("total") 
    while(j < columnLength){ 
     i =0 
     while(i<rowLength){ 
      println(accumulator.value(j)(i)) 
      i += 1 
     } 
     j+=1 
    } 

    println("accumulator value out of rdd is :" + accumulator.value) 

    } 

} 
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] { 

    def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = { 
    initialValue 
    } 

    def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = { 

    acc(value._2)(value._3) = value._1 
    acc 
    } 

    def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = { 
    val columnLength: Int = m1.length 
    val rowLength: Int = m1(0).length 
    var updatedMatrix = Array.ofDim[Int](columnLength, rowLength) 

    var j: Int = 0 
    var i: Int = 0 
    while (j < columnLength) { 
    i =0 
    while (i < rowLength) { 
     println("m1("+j+")("+i+")="+ m1(j)(i) + " m2("+j+")("+i+")="+ m2(j)(i)) 
     val a = Math.max(m1(j)(i), m2(j)(i)) 
     updatedMatrix(j)(i) = a 
     i += 1 
     } 
     j += 1 
    } 

    updatedMatrix 
    } 
} 

И результат:

accumulator 
4 
5 
6 
4 
5 
6 

accumulator 
1 
2 
3 
1 
2 
3 

m1(0)(0)=1 m2(0)(0)=1 
m1(0)(1)=1 m2(0)(1)=2 
m1(0)(2)=1 m2(0)(2)=3 
m1(1)(0)=1 m2(1)(0)=1 
m1(1)(1)=1 m2(1)(1)=2 
m1(1)(2)=1 m2(1)(2)=3 
m1(0)(0)=1 m2(0)(0)=4 
m1(0)(1)=2 m2(0)(1)=5 
m1(0)(2)=3 m2(0)(2)=6 
m1(1)(0)=1 m2(1)(0)=4 
m1(1)(1)=2 m2(1)(1)=5 
m1(1)(2)=3 m2(1)(2)=6 

total 
4 
5 
6 
4 
5 
6 

И изменить код для этого:

//var i: Int = 0 
    while (j < columnLength) { 
    var i =0 

И результат:

m1(0)(0)=1 m2(0)(0)=1 
m1(0)(1)=1 m2(0)(1)=2 
m1(0)(2)=1 m2(0)(2)=3 
m1(1)(0)=1 m2(1)(0)=1 
m1(1)(1)=1 m2(1)(1)=2 
m1(1)(2)=1 m2(1)(2)=3 
m1(0)(0)=1 m2(0)(0)=4 
m1(0)(1)=2 m2(0)(1)=5 
m1(0)(2)=3 m2(0)(2)=6 
m1(1)(0)=1 m2(1)(0)=4 
m1(1)(1)=2 m2(1)(1)=5 
m1(1)(2)=3 m2(1)(2)=6 
total 
4 
5 
6 
4 
5 
6 

accumulator 
1 
2 
3 
1 
2 
3 

accumulator 
4 
5 
6 
4 
5 
6 

Конечный результат такой же.

Но у меня есть два вопроса:

  • Я не знаю, почему порядок два вывода не совпадают.
  • Почему функция addInplace вызывается дважды?
    • Я думаю, что я знаю, почему эта функция будет вызван дважды, но я не уверен,
      • инициализации: Array (Array (1,1,1), Array (1,1,1)
      • вывод из задачи: массив (массив (1,2,3), массив (1,2,3)
      • вывод из другой задачи: массив (массив (4,5,6), массив (4,5) , 6)

@Vijay Innamuri