мой аккумулятор - это массив [Array [Int]] после обновления накопителя при работе RDD, аккумулятор (0), как и ожидалось, где в качестве аккумулятора (1) находится массив (0,0,0), который полностью потерянЗначение искрового накопителя различается, если внутри RDD и снаружи RDD
внутри RDD, значение аккумулятора Array (Array (4,5,6), Array (4,5,6)) внешний RDD, значение аккумулятора Array (Array (4,5,6), массив (0,0,0))
ниже приведен код
import org.apache.spark.Accumulable
import org.apache.spark.AccumulableParam
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object acc {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val a =Array(Array(1,2,3),Array(4,5,6))
val rdd = sc.parallelize(a)
val initialValue = Array.fill[Array[Int]](2)(Array.fill[Int](3)(1))
val accumulator = sc.accumulable(initialValue)(MatrixAccumulatorParam)
rdd.foreach{x=>
accumulator += (x(0),0,0)
accumulator += (x(1),0,1)
accumulator += (x(2),0,2)
accumulator += (x(0),1,0)
accumulator += (x(1),1,1)
accumulator += (x(2),1,2)
println("accumulator value in rdd is"+accumulator.localValue)
}
println("accumulator value out of rdd is :" + accumulator.value)
}
}
object MatrixAccumulatorParam extends AccumulableParam[Array[Array[Int]], (Int, Int, Int)] {
def zero(initialValue: Array[Array[Int]]): Array[Array[Int]] = {
initialValue
}
def addAccumulator(acc: Array[Array[Int]], value: (Int, Int, Int)): Array[Array[Int]] = {
acc(value._2)(value._3) = value._1
acc
}
def addInPlace(m1: Array[Array[Int]], m2: Array[Array[Int]]): Array[Array[Int]] = {
val columnLength: Int = m1.length
val rowLength: Int = m1(0).length
var updatedMatrix = Array.ofDim[Int](columnLength, rowLength)
var j: Int = 0
while (j < columnLength) {
var i =0
while (i < rowLength) {
val a = Math.max(m1(j)(i), m2(j)(i))
updatedMatrix(j)(i) = a
i += 1
}
j += 1
}
updatedMatrix
}
}
Результаты: внутри RDD, значение накопителя - Array (Array (4,5,6), Array (4,5,6)) вне RDD, значение накопителя - Array (Array (4,5,6), Array (0,0,0))
но то, что я ожидал вне РДА является Array (Array (4,5,6), массив (4,5,6))
Эта проблема решена? –