Я использую API-интерфейсы API для удаления дубликатов. То, что я пытаюсь сделать, это группировать дублированные строки, чтобы оставить только одну строку из каждой группы, но с столбцом, определяющим количество строк, которые были свернуты в эту строку.Как выполнить некоторые операции с набором данных Spark, не влияя на структуру данных?
Рассмотрим следующий пример. У меня есть следующие данные, где последнее поле указывает строки разрушилась в этой строке:
- A, B, C, 5
- A, D, G, 1
В этот момент, Я хотел бы сгруппировать данные по первому полю, сохранить остальные поля в строке, и большинство строк свернулось в нее и добавит количество строк, свернутых во второй, к первому. Таким образом, результат будет:
- A, B, C, 6
Я уже реализовал его, и проблема о формате получаемых данных.
Вот мой код:
val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.TITLE).reduceGroups((a,b) => if(a.TIMES_COLLAPSED > b.TIMES_COLLAPSED) a.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED) else b.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED)).toDF("key", "data")
Если я исполняю printSchema
на sameTitleArticlesCollapsed
, выход:
root
|-- key: string (nullable = true)
|-- data: struct (nullable = true)
| |-- CODE: string (nullable = true)
| |-- TITLE: string (nullable = true)
| |-- NAUTHORS: string (nullable = true)
| |-- AUTHORS: string (nullable = true)
| |-- TIMES_COLLAPSED: decimal(38,0) (nullable = true)
Я не забочусь о key
колонки и то, что я хотел бы, чтобы извлеките данные внутри столбца data
, чтобы сохранить его в том же формате, что и перед применением groupByKey - reduceGroups
.
root
|-- CODE: string (nullable = true)
|-- TITLE: string (nullable = true)
|-- NAUTHORS: string (nullable = true)
|-- AUTHORS: string (nullable = true)
|-- TIMES_COLLAPSED: long (nullable = false)
Как я мог это сделать? Есть ли лучший способ сделать этот процесс?
Спасибо!