2017-02-18 6 views
1

Я читаю данные из RDD элемента типа com.google.gson.JsonObject. Попытка конвертировать это в DataSet, но не знаю, как это сделать.Spark Как RDD [JSONObject] в набор данных

import com.google.gson.{JsonParser} 
import org.apache.hadoop.io.LongWritable 
import org.apache.spark.sql.{SparkSession} 

object tmp { 
    class people(name: String, age: Long, phone: String) 
    def main(args: Array[String]): Unit = { 
    val spark = SparkSession.builder().master("local[*]").getOrCreate() 
    val sc = spark.sparkContext 

    val parser = new JsonParser(); 
    val jsonObject1 = parser.parse("""{"name":"abc","age":23,"phone":"0208"}""").getAsJsonObject() 
    val jsonObject2 = parser.parse("""{"name":"xyz","age":33}""").getAsJsonObject() 

    val PairRDD = sc.parallelize(List(
     (new LongWritable(1l), jsonObject1), 
     (new LongWritable(2l), jsonObject2) 
    )) 

    val rdd1 =PairRDD.map(element => element._2) 

    import spark.implicits._ 

    //How to create Dataset as schema People from rdd1? 
    } 
} 

Даже попытка печати rdd1 элементов бросает

object not serializable (class: org.apache.hadoop.io.LongWritable, value: 1) 
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object) 
- object (class scala.Tuple2, (1,{"name":"abc","age":23,"phone":"0208"})) 

В основном я получаю это РДД [LongWritable, JsonParser] из BigQuery таблицы, которую я хочу, чтобы преобразовать в Dataset, так что я могу применить SQL для преобразования.

Я оставил телефон во второй записи пустым намеренно, BigQuery ничего не возвращает для этого элемента с нулевым значением.

ответ

1

Спасибо за разъяснение. Вам необходимо зарегистрировать класс как Serializable в kryo. Следующие шоу работают. Я бегу в свече скорлупе поэтому пришлось разрушить старый контекст и создать новую искру контекст с конфиг, который включал зарегистрированных классов Kryo

import com.google.gson.{JsonParser} 
import org.apache.hadoop.io.LongWritable 
import org.apache.spark.SparkContext 

sc.stop() 

val conf = sc.getConf 
conf.registerKryoClasses(Array(classOf[LongWritable], classOf[JsonParser])) 
conf.get("spark.kryo.classesToRegister") 

val sc = new SparkContext(conf) 

val parser = new JsonParser(); 
val jsonObject1 = parser.parse("""{"name":"abc","age":23,"phone":"0208"}""").getAsJsonObject() 
val jsonObject2 = parser.parse("""{"name":"xyz","age":33}""").getAsJsonObject() 

val pairRDD = sc.parallelize(List(
    (new LongWritable(1l), jsonObject1), 
    (new LongWritable(2l), jsonObject2) 
)) 


val rdd = pairRDD.map(element => element._2) 

rdd.collect() 
// res9: Array[com.google.gson.JsonObject] = Array({"name":"abc","age":23,"phone":"0208"}, {"name":"xyz","age":33}) 

val jsonstrs = rdd.map(e=>e.toString).collect() 
val df = spark.read.json(sc.parallelize(jsonstrs))  
df.printSchema 
// root 
// |-- age: long (nullable = true) 
// |-- name: string (nullable = true) 
// |-- phone: string (nullable = true) 
+0

Благодаря Shoaib, я отредактировал мой вопрос, если это дает больше представления. – xstack2000

 Смежные вопросы

  • Нет связанных вопросов^_^