2017-02-21 53 views
1

Я использую API-интерфейсы API для удаления дубликатов. То, что я пытаюсь сделать, это группировать дублированные строки, чтобы оставить только одну строку из каждой группы, но с столбцом, определяющим количество строк, которые были свернуты в эту строку.Как выполнить некоторые операции с набором данных Spark, не влияя на структуру данных?

Рассмотрим следующий пример. У меня есть следующие данные, где последнее поле указывает строки разрушилась в этой строке:

  • A, B, C, 5
  • A, D, G, 1

В этот момент, Я хотел бы сгруппировать данные по первому полю, сохранить остальные поля в строке, и большинство строк свернулось в нее и добавит количество строк, свернутых во второй, к первому. Таким образом, результат будет:

  • A, B, C, 6

Я уже реализовал его, и проблема о формате получаемых данных.

Вот мой код:

val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.TITLE).reduceGroups((a,b) => if(a.TIMES_COLLAPSED > b.TIMES_COLLAPSED) a.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED) else b.copy(TIMES_COLLAPSED = a.TIMES_COLLAPSED + b.TIMES_COLLAPSED)).toDF("key", "data") 

Если я исполняю printSchema на sameTitleArticlesCollapsed, выход:

root 
|-- key: string (nullable = true) 
|-- data: struct (nullable = true) 
| |-- CODE: string (nullable = true) 
| |-- TITLE: string (nullable = true) 
| |-- NAUTHORS: string (nullable = true) 
| |-- AUTHORS: string (nullable = true) 
| |-- TIMES_COLLAPSED: decimal(38,0) (nullable = true) 

Я не забочусь о key колонки и то, что я хотел бы, чтобы извлеките данные внутри столбца data, чтобы сохранить его в том же формате, что и перед применением groupByKey - reduceGroups.

root 
|-- CODE: string (nullable = true) 
|-- TITLE: string (nullable = true) 
|-- NAUTHORS: string (nullable = true) 
|-- AUTHORS: string (nullable = true) 
|-- TIMES_COLLAPSED: long (nullable = false) 

Как я мог это сделать? Есть ли лучший способ сделать этот процесс?

Спасибо!

ответ

2

можно добавить карту в конце, как показано ниже, чтобы сохранить оригинальную схему,

val sameTitleArticlesCollapsed = articlesCollapsed.groupByKey(_.title).reduceGroups((a,b) => if(a.times_collapsed > b.times_collapsed) a.copy(times_collapsed = a.times_collapsed + b.times_collapsed) else b.copy(times_collapsed = a.times_collapsed + b.times_collapsed)) 

val result = sameTitleArticlesCollapsed.map({case (_,value) => value}).toDF 

result.printSchema 
root 
|-- code: string (nullable = true) 
|-- title: string (nullable = true) 
|-- nauthors: string (nullable = true) 
|-- authors: string (nullable = true) 
|-- times_collapsed: long (nullable = true)