2015-11-16 7 views
0

Я пытаюсь вернуть RDD[(String,String,String)], и я не могу это сделать, используя flatMap. Я попытался (tweetId, tweetBody, gender) и (tweetId, tweetBody, gender), но он дает мне ошибку типа несоответствия вы можете Guid мне знать, как я могу вернуть RDD[(String, String, String)] из flatMapошибка несоответствие в scala

override def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = { 
    val idColumnName = config.getConfigString("column_name").getOrElse("id") 
    val bodyColumnName = config.getConfigString("column_name").getOrElse("body") 
    val genderColumnName = config.getConfigString("column_name").getOrElse("gender") 

    // convert each input element to a JsonValue 
    val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r)) 

    val hashtagsRDD: RDD[(String,String, String)] = jsonRDD.mapPartitions(r => { 
     // register jackson mapper (this needs to be instantiated per partition 
     // since it is not serializable) 
     val mapper = new ObjectMapper() 
     mapper.registerModule(DefaultScalaModule) 

     r.flatMap(tweet => tweet match { 
     case _ :: tweet => 
     val rootNode = mapper.readTree(tweet) 
     val tweetId = rootNode.path("id").asText.split(":")(2) 
     val tweetBody = rootNode.path("body").asText 
     val tweetVector = new HashingTF().transform(tweetBody.split(" ")) 
     val result =genderModel.predict(tweetVector) 
     val gender = if(result == 1.0){"Male"}else{"Female"} 

     (tweetId, tweetBody, gender) 
     // Array(1).map(x => (tweetId, tweetBody, gender)) 

     }) 

    }) 

    val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1,x._2,x._3)) 
    val schema = StructType(Array(StructField(idColumnName,StringType, true),StructField(bodyColumnName, StringType, true),StructField(genderColumnName,StringType, true))) 
    sqlContext.createDataFrame(rowRDD, schema) 
    } 
} 
+0

Опишите свою проблему немного больше – Odomontois

+0

Я пытаюсь вернуться с помощью RDD [String, String, String], и я не могу это сделать, используя плоскую карту. Я попытался (tweetId, tweetBody, пол) и {tweetId, tweetBody, пол} , но это дает мне и ошибки типа несоответствия вы можете Guid мне знать, как я могу вернуть RDD [(String, String, String) ] from flatmap –

+0

Пожалуйста, добавьте соответствующую информацию на ваш вопрос, отправьте текст ошибки и исправьте форматирование – Odomontois

ответ

0

Try использовать map вместо flatMap. flatMap используется в тех случаях, когда результат функции параметра есть или RDD

I.e. flatMap используется, когда каждый элемент текущей коллекции отображается в ноль или более элементов. Пока map используется, когда каждый элемент текущей коллекции отображается ровно на один элемент.

map с A => B обмена символов A с символом B в functorial types, т.е. преобразует RDD[A] к RDD[B]

flatMap может быть прочитан как map затем flatten в monadic types. Например. у вас есть и RDD[A] и функции параметр имеет тип A => RDD[B] результат простого map будет RDD[RDD[B]] и что пара вхождений может быть упрощена только RDD[B] через flatten

Вот пример скомпилированного кода успешно.

import com.fasterxml.jackson.databind.ObjectMapper 
import com.fasterxml.jackson.module.scala.DefaultScalaModule 
import org.apache.spark.rdd.RDD 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types.{StringType, StructField, StructType} 

class UserTransformConfig { 
    def getConfigString(name: String): Option[String] = ??? 
} 

class PhaseLogger 
object byteUtils { 
    def bytesToUTF8String(r: Array[Byte]): String = ??? 
} 

class HashingTF { 
    def transform(strs: Array[String]): Array[Double] = ??? 
} 

object genderModel { 
    def predict(v: Array[Double]): Double = ??? 
} 

def transform(sqlContext: SQLContext, rdd: RDD[Array[Byte]], config: UserTransformConfig, logger: PhaseLogger): DataFrame = { 
    val idColumnName = config.getConfigString("column_name").getOrElse("id") 
    val bodyColumnName = config.getConfigString("column_name").getOrElse("body") 
    val genderColumnName = config.getConfigString("column_name").getOrElse("gender") 

    // convert each input element to a JsonValue 
    val jsonRDD = rdd.map(r => byteUtils.bytesToUTF8String(r)) 

    val hashtagsRDD: RDD[(String, String, String)] = jsonRDD.mapPartitions(r => { 
    // register jackson mapper (this needs to be instantiated per partition 
    // since it is not serializable) 
    val mapper = new ObjectMapper 
    mapper.registerModule(DefaultScalaModule) 

    r.map { tweet => 
     val rootNode = mapper.readTree(tweet) 
     val tweetId = rootNode.path("id").asText.split(":")(2) 
     val tweetBody = rootNode.path("body").asText 
     val tweetVector = new HashingTF().transform(tweetBody.split(" ")) 
     val result = genderModel.predict(tweetVector) 
     val gender = if (result == 1.0) {"Male"} else {"Female"} 

     (tweetId, tweetBody, gender) 

    } 
    }) 

    val rowRDD: RDD[Row] = hashtagsRDD.map(x => Row(x._1, x._2, x._3)) 
    val schema = StructType(Array(StructField(idColumnName, StringType, true), StructField(bodyColumnName, StringType, true), StructField(genderColumnName, StringType, true))) 
    sqlContext.createDataFrame(rowRDD, schema) 
} 

обратите внимание, сколько я должен привезти из моего воображения, потому что вы не поставить minimum example. В целом вопросы, подобные этому, не стоит отвечать

+0

, но результат вышеуказанного кода: RDD [(string, string, strin)] –

+0

@AhmedAlashrafy просмотрите ответное обновление – Odomontois

+0

Спасибо за ваши усилия, чтобы ответить на мой вопрос. Но то, что мне нужно –