2016-06-28 3 views
1

Я пытался извлечь информацию из 1-столбцовой информационной диаграммы Spark, состоящей из парных разрядов, и поместил ее в Breeze SparseVector. Для этого я просматриваю каждый элемент моего 1-столбца DataFrame, заставляя его быть двойным, а затем добавляем его в VectorBuilder. My VectorBuilder правильно мутирует свое состояние в цикле foreach, после чего все изменения очищаются после завершения цикла. Почему это происходит? Есть ли обходной путь?Построение разреженногоVector из значений в DataFrame

EDIT 1:

Я бегу это локально с 1 ядром; это не на кластерном

Код:

val accum = sc.accumulator(0, "Counter") 

def correlate() : Unit = { 

    val cols = df.columns 
    val id = cols(0)  
    val id2 = cols(1) 

    //id1 and id2 are there for 
    val df1 = sqlContext.sql(s"SELECT ${id} FROM dataset WHERE (${id} IS NOT NULL AND ${id2} IS NOT NULL)") 

    /* df1 is a dataframe that has 1 column*/ 
    df1.show(); 
    accum.value_=(0); 

    /******************** Problem starts here **********************/ 
    val builder = new VectorBuilder[Double](5) 
    df1.foreach{ x => 
    x(0) match{    
     case d : Double => 
     builder.add(accum.value, d); 
     //This print statement prints out correct values 
     println(s"index: ${accum.value} value: ${builder(accum.value)}")  
     accum.value += 1; 
     println(s"builder's active size in loop: ${builder.activeSize}") 
     case _ => throw new ClassCastException("Pattern-Matching for Double failed"); 
    } 
    } 
    //temp becomes empty at this point 
    println(s"builder's active size out of loop: ${builder.activeSize}") 

    val sparse = builder.toSparseVector  
    sparse.foreachPair{(i,v) => println(s"index: ${i} and value: ${v}")} 
} 
this.correlate() 

Выход:

+-------+ 
| RowX| 
+-------+ 
| 145.0| 
| -1.0| 
|-212.21| 
| 23.3| 
| 21.4| 
+-------+ 

index: 0 value: 145.0 
builder's active size in loop: 1 
index: 1 value: -1.0 
builder's active size in loop: 2 
index: 2 value: -212.21 
builder's active size in loop: 3 
index: 3 value: 23.3 
builder's active size in loop: 4 
index: 4 value: 21.4 
builder's active size in loop: 5 

//the loop ends here and builder's state disappears 

builder's active size out of loop: 0 
index: 0 and value: 0.0 
index: 1 and value: 0.0 
index: 2 and value: 0.0 
index: 3 and value: 0.0 
index: 4 and value: 0.0 

ответ

1

Это добавляет к локальной копии строителя для каждого работника. Получить локальный объект:

SparseVector(df1.rdd.map(_.getDouble(0)).collect)