3

Скажем, у меня есть структура данных, как это, где ц некоторая TIMESTAMPСпарк Dataframes- Снижение по ключевым

case class Record(ts: Long, id: Int, value: Int) 

Учитывая большое количество этих записей я хочу закончить с записью с самой высокой отметки времени каждый идентификатор. Использование API RDD Я думаю, следующий код получает работу:

def findLatest(records: RDD[Record])(implicit spark: SparkSession) = { 
    records.keyBy(_.id).reduceByKey{ 
    (x, y) => if(x.ts > y.ts) x else y 
    }.values 
} 

Точно так же это моя попытка с наборами данных:

def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = { 
    records.groupByKey(_.id).mapGroups{ 
    case(id, records) => { 
     records.reduceLeft((x,y) => if (x.ts > y.ts) x else y) 
    } 
    } 
} 

Я будучи пытаюсь решить, как добиться чего-то подобного с dataframes но не имость я понимаю, что я могу сделать группировку с:

records.groupBy($"id") 

но это дает мне RelationGroupedDataSet и это мне не ясно, какую функцию агрегирования мне нужно написать добиться того, что я хочу - все примерные агрегации, которые я видел, сосредоточены на возврате только одного столбца, а не всей строки.

Можно ли достичь этого с помощью данных?

ответ

7

Вы можете использовать Argmax логику (см databricks example)

Например, предположим, что ваш dataframe называется ДФ и имеет столбцы идентификатор, Валя, ц вы могли бы сделать что-то вроде этого:

import org.apache.spark.sql.functions._ 
val newDF = df.groupBy('id).agg.max(struct('ts, 'val)) as 'tmp).select($"id", $"tmp.*") 
+0

Яп, который работает отлично, спасибо! так как в стороне, я думаю, у вас есть небольшая опечатка в ответе - период после agg должен быть скобкой! – d80tb7

+0

Можете ли вы объяснить, почему это работает? Максимально ли применяется только первый столбец? – user238607

+0

@ user238607 max использует порядок для столбца. Неявное упорядочение для структуры (или кортежа) осуществляется путем упорядочения первого элемента. –

0

Для Datasets я сделал это, испытан на Спарке 2.1.1

final case class AggregateResultModel(id: String, 
             mtype: String, 
             healthScore: Int, 
             mortality: Float, 
             reimbursement: Float) 
..... 
..... 

// assume that the rawScores are loaded behorehand from json,csv files 

val groupedResultSet = rawScores.as[AggregateResultModel].groupByKey(item => (item.id,item.mtype)) 
     .reduceGroups((x,y) => getMinHealthScore(x,y)).map(_._2) 


// the binary function used in the reduceGroups 

def getMinHealthScore(x : AggregateResultModel, y : AggregateResultModel): AggregateResultModel = { 
    // complex logic for deciding between which row to keep 
    if (x.healthScore > y.healthScore) { return y } 
    else if (x.healthScore < y.healthScore) { return x } 
    else { 

     if (x.mortality < y.mortality) { return y } 
     else if (x.mortality > y.mortality) { return x } 
     else { 

     if(x.reimbursement < y.reimbursement) 
      return x 
     else 
      return y 

     } 

    } 

    }