0

С DataFrame называется lastTail, я могу перебирать, как это:Спарк Scala DataFrame однорядные преобразование в формат JSON для PostrgeSQL вставки

import scalikejdbc._ 
// ... 
// Do Kafka Streaming to create DataFrame lastTail 
// ... 

lastTail.printSchema 

lastTail.foreachPartition(iter => { 

// open database connection from connection pool 
// with scalikeJDBC (to PostgreSQL) 

    while(iter.hasNext) { 
    val item = iter.next() 
    println("****") 
    println(item.getClass) 
    println(item.getAs("fileGid")) 
    println("Schema: "+item.schema) 
    println("String: "+item.toString()) 
    println("Seqnce: "+item.toSeq) 

    // convert this item into an XXX format (like JSON) 
    // write row to DB in the selected format 
    } 
}) 

Это выводит "что-то вроде" (с редакцией): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)

и (только с одной итерации пункта - отредактированный, но, надеюсь, с достаточно хорошим синтаксисом, а)

**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])

Примечание: Я делаю часть, как val metric = iter.sum, на https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala, но вместо этого с DataFrames. Я также следую «Шаблонам проектирования для использования foreachRDD», указанному в http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning.

Как я могу преобразовать этот org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (см https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) итерационного элемента в то, что легко написано (JSON или ... - Я открыт) в PostgreSQL. (Если не JSON, пожалуйста, предложите, как прочитать это значение обратно в DataFrame для использования в другом месте.)

ответ

0

Ну, я понял, как это сделать, как работа.

val ltk = lastTail.select($"fileGid").rdd.map(fileGid => fileGid.toString) 
val ltv = lastTail.toJSON 
val kvPair = ltk.zip(ltv) 

Тогда я бы просто перебирал RDD вместо DataFrame.

kvPair.foreachPartition(iter => { 
    while(iter.hasNext) { 
    val item = iter.next() 
    println(item.getClass) 
    println(item) 
    } 
}) 

Данные в сторону, я получаю class scala.Tuple2, что делает для более простой способ для хранения пар КВ в JDBC/PostgreSQL.

Я уверен, что могут быть и другие способы, которые не являются рабочими.

+0

Еще лучше - @ zero323 указал мне на эту тему, чтобы улучшить первую часть моего ответа (т. Е. Удаление zip) - http://stackoverflow.com/questions/36157810/spark-row-to-json – codeaperature