2014-11-04 2 views
0
// 4 workers 
    val sc = new SparkContext("local[4]", "naivebayes") 

    // Load documents (one per line). 
    val documents: RDD[Seq[String]] = sc.textFile("/tmp/test.txt").map(_.split(" ").toSeq) 

    documents.zipWithIndex.foreach{ 
    case (e, i) => 
    val collectedResult = Tokenizer.tokenize(e.mkString) 
    } 

    val hashingTF = new HashingTF() 
    //pass collectedResult instead of document 
    val tf: RDD[Vector] = hashingTF.transform(documents) 

    tf.cache() 
    val idf = new IDF().fit(tf) 
    val tfidf: RDD[Vector] = idf.transform(tf) 

в приведенном выше фрагменте кода, я хотел бы, чтобы извлечь collectedResult, чтобы использовать его для hashingTF.transform, как это может быть достигнуто, когда подпись разметить функцииконвертировать строку в Scala РДУ [след [строка]]

def tokenize(content: String): Seq[String] = { 
... 
} 

ответ

1

Похоже, вы хотите map, а не foreach. Я не понимаю, что вы используете zipWithIndex, а не за то, что вы звоните split на своих линиях, чтобы снова присоединиться к ним снова с помощью mkString.

val lines: Rdd[String] = sc.textFile("/tmp/test.txt") 
val tokenizedLines = lines.map(tokenize) 
val hashes = tokenizedLines.map(hashingTF) 
hashes.cache() 
... 
+0

@Imm как я объявляю другое rdd? жаль, что я новичок! – Siva

+0

Вы сказали, что хотите добавить возвращаемое значение этой функции к какому-либо другому 'RDD [Seq [String]]', no? 'otherRdd' - это тот, который вы хотите добавить. – lmm

+0

inp.zipWithIndex.foreach { случай (е, я) => вал результат: РДД [Seq [String]] ++ = sc.parallelize (Seq (Tokenizer.tokenize (е))) } я предполагаю, что это неправильно, я хотел объявить результат из цикла, добавить его и получить как один rdd. Я не уверен, как я могу это сделать. – Siva