Скажем, у меня есть структура данных, как это, где ц некоторая TIMESTAMPСпарк Dataframes- Снижение по ключевым
case class Record(ts: Long, id: Int, value: Int)
Учитывая большое количество этих записей я хочу закончить с записью с самой высокой отметки времени каждый идентификатор. Использование API RDD Я думаю, следующий код получает работу:
def findLatest(records: RDD[Record])(implicit spark: SparkSession) = {
records.keyBy(_.id).reduceByKey{
(x, y) => if(x.ts > y.ts) x else y
}.values
}
Точно так же это моя попытка с наборами данных:
def findLatest(records: Dataset[Record])(implicit spark: SparkSession) = {
records.groupByKey(_.id).mapGroups{
case(id, records) => {
records.reduceLeft((x,y) => if (x.ts > y.ts) x else y)
}
}
}
Я будучи пытаюсь решить, как добиться чего-то подобного с dataframes но не имость я понимаю, что я могу сделать группировку с:
records.groupBy($"id")
но это дает мне RelationGroupedDataSet и это мне не ясно, какую функцию агрегирования мне нужно написать добиться того, что я хочу - все примерные агрегации, которые я видел, сосредоточены на возврате только одного столбца, а не всей строки.
Можно ли достичь этого с помощью данных?
Яп, который работает отлично, спасибо! так как в стороне, я думаю, у вас есть небольшая опечатка в ответе - период после agg должен быть скобкой! – d80tb7
Можете ли вы объяснить, почему это работает? Максимально ли применяется только первый столбец? – user238607
@ user238607 max использует порядок для столбца. Неявное упорядочение для структуры (или кортежа) осуществляется путем упорядочения первого элемента. –