С DataFrame называется lastTail
, я могу перебирать, как это:Спарк Scala DataFrame однорядные преобразование в формат JSON для PostrgeSQL вставки
import scalikejdbc._
// ...
// Do Kafka Streaming to create DataFrame lastTail
// ...
lastTail.printSchema
lastTail.foreachPartition(iter => {
// open database connection from connection pool
// with scalikeJDBC (to PostgreSQL)
while(iter.hasNext) {
val item = iter.next()
println("****")
println(item.getClass)
println(item.getAs("fileGid"))
println("Schema: "+item.schema)
println("String: "+item.toString())
println("Seqnce: "+item.toSeq)
// convert this item into an XXX format (like JSON)
// write row to DB in the selected format
}
})
Это выводит "что-то вроде" (с редакцией): root |-- fileGid: string (nullable = true) |-- eventStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true) |-- revisionStruct: struct (nullable = false) | |-- eventIndex: integer (nullable = true) | |-- eventGid: string (nullable = true) | |-- eventType: string (nullable = true)
и (только с одной итерации пункта - отредактированный, но, надеюсь, с достаточно хорошим синтаксисом, а)
**** class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema 12345 Schema: StructType(StructField(fileGid,StringType,true), StructField(eventStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true)), StructField(revisionStruct,StructType(StructField(eventIndex,IntegerType,true), StructField(eventGid,StringType,true), StructField(eventType,StringType,true), StructField(editIndex,IntegerType,true)),false)) String: [12345,[1,4,edit],[1,4,revision]] Seqnce: WrappedArray(12345, [1,4,edit], [1,4,revision])
Примечание: Я делаю часть, как val metric = iter.sum
, на https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerPartition.scala, но вместо этого с DataFrames. Я также следую «Шаблонам проектирования для использования foreachRDD», указанному в http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning.
Как я могу преобразовать этот org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema (см https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/rows.scala) итерационного элемента в то, что легко написано (JSON или ... - Я открыт) в PostgreSQL. (Если не JSON, пожалуйста, предложите, как прочитать это значение обратно в DataFrame для использования в другом месте.)
Еще лучше - @ zero323 указал мне на эту тему, чтобы улучшить первую часть моего ответа (т. Е. Удаление zip) - http://stackoverflow.com/questions/36157810/spark-row-to-json – codeaperature