2

Я оцениваю замену существующего кода RDD на Dataset. Для одного из моих случаев использования я не могу сопоставить набор данных с другим классом case.Dataset in Spark 1.6

Вот что я пытаюсь сделать ...

case class MyMap(map: Map[String, String]) 

case class V1(a: String, b: String){ 
    def toMyMap: MyMap = { 
    MyMap(Map(a->b)) 
    } 

    def toStr: String = { 
    a 
    } 
} 

object MyApp extends App { 
//Get handle to sqlContext and other useful stuff here. 
val df1 = sqlContext.createDataset(Seq(V1("2015-05-01", "data1"), V1("2015-05-01", "data2"))).toDF() 
df1.as[V1].map(_.toMyMap).show() //Errors out. Added the exception below. 
df1.as[V1].map(_.toStr).show() //Works fine. 
} 

Любая помощь будет оценена.

со следующим исключением:

Исключение в потоке "основного" org.apache.spark.SparkException: Работа прервана из-за сбоя стадии: Задача не сериализуемым: java.io.NotSerializableException: Scala .reflect.runtime.SynchronizedSymbols $ SynchronizedSymbol $$ Анон $ 1 стек Сериализация: - объект не сериализации (класс: scala.reflect.runtime.SynchronizedSymbols $ SynchronizedSymbol $$ Анон $ 1, значение: пакет Ланг) - поле (класс: scala.reflect.internal.Types $ ThisType, name: sym, type: class scal_reflect.internal.Symbols $ Symbol) - объект (класс scala.reflect.internal.Types $ UniqueThisType, java.lang.type) - поле (класс: scala.reflect.internal.Types $ TypeRef, name: pre , type: class scala.reflect.internal.Types $ Type) - объект (класс scala.reflect.internal.Types $ ClassNoArgsTypeRef, String) - field (класс: scala.reflect.internal.Types $ ТипRef, name: normalized , type: class scala.reflect.internal.Types $ Type) - объект (класс scala.reflect.internal.Types $ AliasNoArgsTypeRef, String) - поле (класс: org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 6, name: keyType $ 1, type: class scala.reflect.api.Types $ TypeApi) - объект (класс org.apache.spark.sql.catalyst.ScalaReflection $$ anonfun $ 6,) - field (класс: org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: interface scala.Function1) - объект (класс org.apache.spark.sql.catalyst.expressions.MapObjects, mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true), - Поле (класс: "scala.collection.immutable.Map", имя: "map"), - корень класс: "collector. MyMap "), keyArray, ArrayType (StringType, true)), StringType)) - поле (класс: org.apache.spark.sql.catalyst.expressions.Invoke, name: targetObject, введите: class org.apache.spark .sql.catalyst.expressions.Expression) - объект (класс org.apache.spark.sql.catalyst.expressions.Invoke, invoke (mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true), - раздел (класс: "scala.collection.immutable.M ap ", name:" map "), - root класс: " collector.MyMap "), keyArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;))) - writeObject data (класс: scala.collection.immutable.List $ SerializationProxy) - объект (класс scala.collection.immutable.List $ SerializationProxy, [email protected]) - writeReplace data (класс: scala.collection.immutable.List $ SerializationProxy) - объект (класс scala.collection.immutable. $ colon $ colon, List (invoke (mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true), - Поле (класс: "scala.collection.immutable.Map", name: "map"), - root класс: "collector.MyMap"), keyArray, A rrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)), invoke (mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true), - раздел (класс: "scala.collection.immutable.Map ", name:" map "), - root класс: " collector.MyMap "), valueArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)))) - поле (класс: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, type: interface scala.collection.Seq) - object (класс org.apache.spark.sql. catal.expressions.StaticInvoke, staticinvoke (класс org.apache.spark.sql.catalyst.util.ArrayBasedMapData $, ObjectType (интерфейс scala.collection.Map), toScalaMap, invoke (mapobjects (, invoke (upcast ('map , MapType (StringType, StringType, true), - Поле (класс: "scala.collection.immutable.Map", имя: "map"), - корень класс: «collector.MyMap»), keyArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)), invoke (mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true), - Поле (класс: «scala.collection.immutable.Map», имя: «map»), - root класс: «collector.MyMap»), valueArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)), true)) - writeObject data (класс: scala.collection.immutable.List $ SerializationProxy) - объект (класс scala. collection.immutable.List $ SerializationProxy, [email protected]) - writeReplace data (класс: scala.collection.immutable.List $ SerializationProxy) - объект (класс scala .collection.immutable. $ colon $ colon, List (staticinvoke (класс org.apache.spark.sql.catalyst.util.ArrayBasedMapData $, ObjectType (интерфейс scala.collection.Map), toScalaMap, invoke (mapobjects (, invoke) (upcast ('map, MapType (StringType, StringType, true), - Поле (класс: "scala.collection.immutable.Map", имя: "map"), - корень класс: "collector.MyMap") , keyArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)), invoke (mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true) , - Поле (класс: "scala.collection.immutable.Map", name: "map"), - root класс: "collector.MyMap"), valueArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Lja va.lang.Object;)), true))) - поле (класс: org.apache.spark.sql.catalyst.expressions.NewInstance, name: аргументы, введите: interface scala.collection.Seq) - объект (класс org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance (класс collector.MyMap, staticinvoke (класс org.apache.spark.sql.catalyst.util.ArrayBasedMapData $, ObjectType (интерфейс scala.collection .Map), toScalaMap, invoke (mapobjects (, invoke (upcast ('map, MapType (StringType, StringType, true), - Поле (класс: "scala.collection.immutable.Map", имя: "map"), - root класс: класс: «collector.MyMap»), keyArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)), invoke (mapobjects (, invoke (upca st ('map, MapType (StringType, StringType, true), - Поле (класс: "scala.collection.immutable.Map", имя: "map"), - корень класс: "collector.MyMap"), valueArray, ArrayType (StringType, true)), StringType), array, ObjectType (класс [Ljava.lang.Object;)), true), false, ObjectType (класс collector.MyMap), None)) - поле (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, имя: fromRowExpression, type: class org.apache.spark.sql.catalyst.expressions.Expression) - объект (класс org.apache.spark. sql.catalyst.encoders.ExpressionEncoder, класс [map # ExprId (9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - field (класс: org.apache.spark.sql.execution.MapPartitions, имя: uEncoder, type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - объект (класс org.apache.spark.sql.execution.MapPartitions,! MapPartitions, класс [a [0]: строка, b [0]: строка], класс [map # ExprId (9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map # 13] + - [0180000000a, 2800000005,2d35302d35313032,3130,3161746164], [0,180000000a, 2800000005,2d35302d35313032,3130,3261746164]] ) - field (класс: org. apache.spark.sql.execution.MapPartitions $$ anonfun $ 8, name: $ outer, Тип: класс org.apache.spark.sql.execution.MapPartitions) - объект (класс org.apache.spark.sql.execution. MapPartitions $$ anonfun $ 8,) - поле (класс: org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1, name: f $ 22, Тип: интерфейс scala.Function1) - объект (класс org.apache.spark.rdd.RDD $ $ anonfun $ mapPartitionsInternal $ 1, ) - поле (класс: org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ применяются $ 21, имя: $ внешний, тип: класс org.apache. spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1) - объект (класс org.apache.spark.rdd.RDD $$ anonfun $ mapPartitionsInternal $ 1 $$ anonfun $ apply $ 21, ) - поле (класс: org.apache .spark.rdd.MapPartitionsRDD, name: f, type: interface scala.Function3) - объект (класс org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD [1] на шоу в CollectorSpa rkTest.scala: 50) - поле (класс: org.apache.spark.NarrowDependency, name: rdd, type: class org.apache.spark.rdd.RDD) - объект (класс org.apache.spark.OneToOneDependency , [email protected]) - writeObject data (класс: scala.collection.immutable.List $ SerializationProxy) - объект (класс scala.collection.immutable.List $ SerializationProxy, scala.collection.immutable.List $ SerializationProxy @ 6bb23696) - writeReplace data (класс: scala.collection.immutable.List $ SerializationProxy) - объект (класс scala.collection.immutable. $ Colon $ colon, List ([email protected])) - поле (класс: org.apache.spark.rdd.RDD, имя: org $ apache $ spark $ rdd $ RDD $$ зависимости, тип: in terface scala.collection.Seq) - объект (класс org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD [2] при показе на CollectorSparkTest.scala: 50) - поле (класс: scala.Tuple2, name: _1, type: class java.lang.Object) - объект (класс scala.Tuple2, (MapPartitionsRDD [2] при показе на CollectorSparkTest.scala: 50,)) в org.apache.spark.scheduler.DAGScheduler.org $ apache $ искра $ планировщик $ DAGScheduler $$ failJobAndIndependentStages (DAGScheduler.scala: 1431) в org.apache.spark.scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.Apply (DAGScheduler.scala: 1419) в org.apache.spark .scheduler.DAGScheduler $$ anonfun $ abortStage $ 1.apply (DAGScheduler.scala: 1418) at s cala.collection.mutable.ResizableArray $ class.foreach (ResizableArray.scala: 59) at scala.collection.mutable.ArrayBuffer.foreach (ArrayBuffer.scala: 48) at org.apache.spark.scheduler.DAGScheduler.abortStage (DAGScheduler.scala: 1418) на org.apache.spark.scheduler.DAGScheduler.submitMissingTasks (DAGScheduler.scala: 1010) на org.apache.spark.scheduler.DAGScheduler.org $ апач $ искрой $ планировщик $ DAGScheduler $$ submitStage (DAGScheduler.scala: 921) на org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted (DAGScheduler.scala: 861) на org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive (DAGScheduler.Scala: 1607), на org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1599) при org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive (DAGScheduler.scala: 1588) в орг. apache.spark.util.EventLoop $$ Анон $ 1.run (EventLoop.scala: 48) на org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala: 620) в org.apache.spark.SparkContext .runJob (SparkContext.scala: 1832) на org.apache.spark.SparkContext.runJob (SparkContext.scala 1845) на org.apache.spark.SparkContext.runJob (SparkContext.scala: 1858) на org.apache .spark.sql.execution.SparkPlan.executeTake (SparkPlan.scala: 212) в org.apache.spark.sql.execution.Limit.executeCollect (basicOperators.scala: 165) в org.apache.spark.sql.execution.SparkPlan.executeCollectPublic (SparkPlan.scala: 174) в орг .apache.spark.sql.DataFrame $$ anonfun $ орг апач $ $ $ пинок SQL кадр данных $ $$ выполнить $ 1 $ 1.Apply (DataFrame.scala: 1538) в org.apache.spark.sql.DataFrame $$ anonfun орг апач $ $ $ $ пнуть кадр данных SQL $$ $ $ 1 выполнить $ 1.Apply (DataFrame.scala: 1538) в org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId (SQLExecution.scala: 56) в org.apache.spark.sql.DataFrame.withNewExecutionId (DataFrame.scala: 2125) на org.apac he.spark.sql.DataFrame.org $ апач $ пинок $ SQL $ кадр данных $$ выполнять $ 1 (DataFrame.scala: 1537) на org.apache.spark.sql.DataFrame.org $ апач $ пинок $ SQL $ кадр данных $$ сбор (DataFrame.scala: 1544) в org.apache.spark.sql.DataFrame $$ anonfun $ голова $ 1.Apply (DataFrame.scala: 1414) на org.apache.spark.sql.DataFrame $ $ anonfun $ голова $ 1.Apply (DataFrame.scala 1413) на org.apache.spark.sql.DataFrame.withCallback (DataFrame.scala: 2138) на org.apache.spark.sql.DataFrame.head (Frame Data. 1413 Скала) на org.apache.spark.sql.DataFrame.take (DataFrame.scala: 1495) при org.apache.spark.sql.DataFrame.showString (DataFrame.scala: 171) в org.apache. spark.sql.DataFrame.show (DataFra me.scala: 394) на org.apache.spark.sql.Dataset.show (Dataset.scala: 228) в org.apache.spark.sql.Dataset.show (Dataset.scala: 192) в орг. apache.spark.sql.Dataset.show (Dataset.scala: 200)

ответ

1

Проблема в том, что класс Scala карты не сериализации, поэтому API Dataset не могут автоматически генерировать соответствующий датчик. Я хотел бы предложить преобразование карты в строку, а потом разобрать строку и преобразовать обратно в карту (если вы Увеличительные строки в карте).

наборам данных API не может быть лучшим выбором либо. Я написал this article, которые могут представлять интерес.

2

Я думаю, что вы могли бы на самом деле быть удар SPARK-12696, который закреплен в кик/мастер. Я надеюсь выпустить 1.6.1 в ближайшем будущем, которые должны включать этот патч.