3

Я новичок в Scala. Я пытаюсь преобразовать список scala (который держит результаты некоторых вычисленных данных в исходном DataFrame) в Dataframe или Dataset. Я не нахожу никакого прямого метода для этого. Тем не менее, я попытался выполнить следующий процесс для преобразования моего списка в DataSet, но, похоже, он не работает. Ниже приведены три ситуации.Преобразование scala-списка в DataFrame или DataSet

Может кто-нибудь, пожалуйста, предоставит мне некоторый луч надежды, как это сделать? Благодарю.

import org.apache.spark.sql.{DataFrame, Row, SQLContext, DataFrameReader} 
import java.sql.{Connection, DriverManager, ResultSet, Timestamp} 
import scala.collection._ 

case class TestPerson(name: String, age: Long, salary: Double) 
var tom = new TestPerson("Tom Hanks",37,35.5) 
var sam = new TestPerson("Sam Smith",40,40.5) 

val PersonList = mutable.MutableList[TestPerson]() 

//Adding data in list 
PersonList += tom 
PersonList += sam 

//Situation 1: Trying to create dataset from List of objects:- Result:Error 
//Throwing error 
var personDS = Seq(PersonList).toDS() 
/* 
ERROR: 
error: Unable to find encoder for type stored in a Dataset. Primitive types 
    (Int, String, etc) and Product types (case classes) are supported by  
importing sqlContext.implicits._ Support for serializing other types will 
be added in future releases. 
    var personDS = Seq(PersonList).toDS() 

*/ 
//Situation 2: Trying to add data 1-by-1 :- Result: not working as desired.  
the last record overwriting any existing data in the DS 
var personDS = Seq(tom).toDS() 
personDS = Seq(sam).toDS() 

personDS += sam //not working. throwing error 


//Situation 3: Working. However, I am having consolidated data in the list  
which I want to convert to DS; if I loop the results of the list in comma 
separated values and then pass that here, it will work but will create an 
extra loop in the code, which I want to avoid. 
var personDS = Seq(tom,sam).toDS() 
scala> personDS.show() 
+---------+---+------+ 
|  name|age|salary| 
+---------+---+------+ 
|Tom Hanks| 37| 35.5| 
|Sam Smith| 40| 40.5| 
+---------+---+------+ 
+0

Какая версия вашей искры и скалы? –

+0

Искры версии 1.6.1 – Leo

ответ

6

Try без Seq:

case class TestPerson(name: String, age: Long, salary: Double) 
val tom = TestPerson("Tom Hanks",37,35.5) 
val sam = TestPerson("Sam Smith",40,40.5) 
val PersonList = mutable.MutableList[TestPerson]() 
PersonList += tom 
PersonList += sam 

val personDS = PersonList.toDS() 
println(personDS.getClass) 
personDS.show() 

val personDF = PersonList.toDF() 
println(personDF.getClass) 
personDF.show() 
personDF.select("name", "age").show() 

Выход:

class org.apache.spark.sql.Dataset 

+---------+---+------+ 
|  name|age|salary| 
+---------+---+------+ 
|Tom Hanks| 37| 35.5| 
|Sam Smith| 40| 40.5| 
+---------+---+------+ 

class org.apache.spark.sql.DataFrame 

+---------+---+------+ 
|  name|age|salary| 
+---------+---+------+ 
|Tom Hanks| 37| 35.5| 
|Sam Smith| 40| 40.5| 
+---------+---+------+ 

+---------+---+ 
|  name|age| 
+---------+---+ 
|Tom Hanks| 37| 
|Sam Smith| 40| 
+---------+---+ 

Кроме того, убедитесь, чтобы переместить декларацию по делу класса TestPersonoutside the scope of your object.

+0

Спасибо за вышеупомянутое решение, он работал в случае набора данных. Моя конечная цель - получить данные в DataFrame. Я использовал эту команду «scala> val RowsDF = sc.parallelize (personDS) .toDF()», но получаю ошибку «: 51: ошибка: тип несоответствия; найдено: org.apache.spark.sql.Dataset [TestPerson] требуется: Seq [?] val RowsDF = sc.parallelize (personDS) .toDF() " – Leo

+0

Я получил это: scala> val RowsDF = personDS.toDF() RowsDF: org.apache.spark.sql.DataFrame = [имя: строка, возраст: bigint, зарплата: двойной] – Leo