2015-05-13 1 views
17

Я пытаюсь сравнить различные способы агрегирования моих данных.Spark: Как перевести счет (отличное (значение)) в Dataframe API

Это мои входные данные с 2-мя элементами (страницы, посетитель):

(PAG1,V1) 
(PAG1,V1) 
(PAG2,V1) 
(PAG2,V2) 
(PAG2,V1) 
(PAG1,V1) 
(PAG1,V2) 
(PAG1,V1) 
(PAG1,V2) 
(PAG1,V1) 
(PAG2,V2) 
(PAG1,V3) 

Работа с командой SQL в Спарк SQL с этим кодом:

import sqlContext.implicits._ 
case class Log(page: String, visitor: String) 
val logs = data.map(p => Log(p._1,p._2)).toDF() 
logs.registerTempTable("logs") 
val sqlResult= sqlContext.sql(
           """select page 
             ,count(distinct visitor) as visitor 
            from logs 
           group by page 
           """) 
val result = sqlResult.map(x=>(x(0).toString,x(1).toString)) 
result.foreach(println) 

я получаю этот выход:

(PAG1,3) // PAG1 has been visited by 3 different visitors 
(PAG2,2) // PAG2 has been visited by 2 different visitors 

Теперь я хотел бы получить тот же результат, используя Dataframes и API-интерфейс thiers, но я не могу получить то же самое Выход:

import sqlContext.implicits._ 
case class Log(page: String, visitor: String) 
val logs = data.map(p => Coppia(p._1,p._2)).toDF() 
val result = log.select("page","visitor").groupBy("page").count().distinct 
result.foreach(println) 

На самом деле, это то, что я получаю в качестве вывода:

[PAG1,8] // just the simple page count for every page 
[PAG2,4] 

Это, вероятно, что-то тупой, но я не могу увидеть его прямо сейчас.

Заранее благодарен!

FF

ответ

36

Что вам нужно, это DataFrame функция агрегации countDistinct:

import sqlContext.implicits._ 
import org.apache.spark.sql.functions._ 

case class Log(page: String, visitor: String) 

val logs = data.map(p => Coppia(p._1,p._2)) 
      .toDF() 

val result = log.select("page","visitor") 
      .groupBy('page) 
      .agg('page, countDistinct('visitor)) 

result.foreach(println) 
+2

Я получаю эту ошибку -> не найдено: значение countDistinct –

+1

это метод 'org.apache.spark.sql .functions', import that :), edit done. –

+0

с intelliJ Мне нужно написать команду agg/countDistinct, такую ​​как .agg (org.apache.spark.sql.functions.countDistinct («visitor»)), потому что даже если я импортировал org.apache.spark.sql. функции все равно дают мне ту же ошибку ... в любом случае это работает, но я получаю только столбец посетителя и столбец страницы ([2], [3]) ... что мне не хватает? –

 Смежные вопросы

  • Нет связанных вопросов^_^