3

У меня есть некоторые данные, хранящиеся в DataSet [(Long, LINESTRING)] с использованием кортежа кодер с Kryo кодера для LineStringSpark 2.0.0: Как объединить DataSet с пользовательскими кодированными типами?

implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) 
implicit def tuple2[A1, A2](implicit 
          e1: Encoder[A1], 
          e2: Encoder[A2] 
          ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) 
implicit val lineStringEncoder = Encoders.kryo[LineString] 

val ds = segmentPoints.map(
    sp => { 
    val p1 = new Coordinate(sp.lon_ini, sp.lat_ini) 
    val p2 = new Coordinate(sp.lon_fin, sp.lat_fin) 
    val coords = Array(p1, p2) 

    (sp.id, gf.createLineString(coords)) 
    }) 
    .toDF("id", "segment") 
    .as[(Long, LineString)] 
    .cache 

ds.show 

    +----+--------------------+ 
    | id |  segment  | 
    +----+--------------------+ 
    | 347|[01 00 63 6F 6D 2...| 
    | 347|[01 00 63 6F 6D 2...| 
    | 347|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    | 808|[01 00 63 6F 6D 2...| 
    +----+--------------------+ 

можно применить любую операцию по карте на колонке сегмента и использовать основные методы LineStrign.

ds.map(_._2.getClass.getName).show(false) 

+--------------------------------------+ 
|value         | 
+--------------------------------------+ 
|com.vividsolutions.jts.geom.LineString| 
|com.vividsolutions.jts.geom.LineString| 
|com.vividsolutions.jts.geom.LineString| 

Я хотел бы создать несколько UDAFs для обработки сегментов с одинаковыми идентификаторами, я попробовал феллинг два различных подхода без успеха:

1) Использование агрегатора:

val length = new Aggregator[LineString, Double, Double] with Serializable { 
    def zero: Double = 0      // The initial value. 
    def reduce(b: Double, a: LineString) = b + a.getLength // Add an element to the running total 
    def merge(b1: Double, b2: Double) = b1 + b2 // Merge intermediate values. 
    def finish(b: Double) = b 
    // Following lines are missing on the API doc example but necessary to get 
    // the code compile 
    override def bufferEncoder: Encoder[Double] = Encoders.scalaDouble 
    override def outputEncoder: Encoder[Double] = Encoders.scalaDouble 
}.toColumn 

ds.groupBy("id) 
    .agg(length(col("segment")).as("kms")) 
    .show(false) 

Здесь я получаю следующее сообщение об ошибке:

Exception in thread "main" org.apache.spark.sql.AnalysisException: unresolved operator 'Aggregate [id#603L], [id#603L, anon$1([email protected], None, input[0, double, true] AS value#715, cast(value#715 as double), input[0, double, true] AS value#714, DoubleType, DoubleType)['segment] AS kms#721]; 

2) Использование UserDefinedAggregateFunction

class Length extends UserDefinedAggregateFunction { 
    val e = Encoders.kryo[LineString] 

    // This is the input fields for your aggregate function. 
    override def inputSchema: StructType = StructType(
    StructField("segment", DataTypes.BinaryType) :: Nil 
) 

    // This is the internal fields you keep for computing your aggregate. 
    override def bufferSchema: StructType = StructType(
     StructField("length", DoubleType) :: Nil 
) 

    // This is the output type of your aggregatation function. 
    override def dataType: DataType = DoubleType 

    override def deterministic: Boolean = true 

    // This is the initial value for your buffer schema. 
    override def initialize(buffer: MutableAggregationBuffer): Unit = { 
    buffer(0) = 0.0 
    } 

    // This is how to update your buffer schema given an input. 
    override def update(buffer : MutableAggregationBuffer, input : Row) : Unit = { 
    // val l0 = input.getAs[LineString](0) // Can't cast to LineString (I guess because it is searialized using given encoder) 
    val b = input.getAs[Array[Byte]](0) // This works fine 
    val lse = e.asInstanceOf[ExpressionEncoder[LineString]] 
    val ls = lse.fromRow(???) // it expects InternalRow but input is a Row instance 
    // I also tried casting b.asInstance[InternalRow] without success. 
    buffer(0) = buffer.getAs[Double](0) + ls.getLength 
    } 

    // This is how to merge two objects with the bufferSchema type. 
    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { 
    buffer1(0) = buffer1.getAs[Double](0) + buffer2.getAs[Double](0) 
    } 

    // This is where you output the final value, given the final value of your bufferSchema. 
    override def evaluate(buffer: Row): Any = { 
    buffer.getDouble(0) 
    } 
} 

val length = new Length 
rseg 
    .groupBy("id") 
    .agg(length(col("segment")).as("kms")) 
    .show(false) 

Что я делаю неправильно? Я хотел бы использовать API агрегации с пользовательскими типами вместо использования rdd groupBy API. Я искал документ Spark, но не смог найти ответ на эту проблему, похоже, на ранней стадии.

Спасибо.

ответ

0

В соответствии с этим answer нет простого способа передачи в пользовательских кодировках для вложенных типов, например (Long, LineString) в вашем случае.

Одним из вариантов может быть, чтобы определить case class LineStringWithID, который мог бы расширить LineString с id: Long атрибутом, а также использовать кодеры из SQLImplicits

P.S. Можете ли вы разбить свои вопросы на более мелкие части, по одной теме?

 Смежные вопросы

  • Нет связанных вопросов^_^