2016-12-29 22 views
0

Ниже приведен мой набор данных.API-справочник по API: найти распределение использования устройства для каждого пользователя вместе с другим агрегированием

user,device,time_spent,video_start 
userA,mob,5,1 
userA,desk,5,2 
userA,desk,5,3 
userA,mob,5,2 
userA,mob,5,2 
userB,desk,5,2 
userB,mob,5,2 
userB,mob,5,2 
userB,desk,5,2 

Я хочу узнать об агрегировании ниже для каждого пользователя.

user  total_time_spent  device_distribution 
    userA   20    {mob:60%,desk:40%} 
    userB   20    {mob:50%,desk:50%} 

Может ли кто-нибудь помочь мне достичь этого, используя искру 2.0 API, желательно на Java. Я попытался использовать UserDefinedAggregateFunction, но он не поддерживает группу внутри группы, поскольку мне приходится группировать каждую группу пользователей по устройству, чтобы найти агрегированное время, затрачиваемое на каждое устройство.

ответ

1

Здесь функция pivot очень полезна. article от Databricks по этому вопросу. Для кода (жаль, что Scala, но это не должно быть большой проблемой, чтобы перевести его на Java):

import org.apache.spark.sql.functions.udf 

case class DeviceDistribution(mob: String, desk: String) 

val makeDistribution = udf((mob: Long, desk: Long) => { 
    val mobPct = 100.0 * mob/(mob + desk) 
    val deskPct = 100.0 * desk/(mob + desk) 

    DeviceDistribution(s"$mobPct%", s"$deskPct%") 
}) 

// load your dataset 

data 
    .groupBy("user", "device") 
    .agg(sum("time_spent").as("total_time_spent_by_device")) 
    .groupBy("user") 
    .pivot("device", Seq("mob", "desk")) 
    .agg(first(col("total_time_spent_by_device"))) 
    .withColumn("total_time_spent", col("mob") + col("desk")) 
    .withColumn("device_distribution", makeDistribution(col("mob"), col("desk"))) 
    .select("user", "total_time_spent", "device_distribution") 
    .show 

// Result 
+-----+----------------+-------------------+ 
| user|total_time_spent|device_distribution| 
+-----+----------------+-------------------+ 
|userA|    25|  [60.0%,40.0%]| 
|userB|    20|  [50.0%,50.0%]| 
+-----+----------------+-------------------+ 

NB: с функцией pivot вам нужна функция агрегации. Здесь, поскольку на устройстве есть только одно значение, вы можете просто использовать first.

Формат device_distribution колонки не совсем то, что вы ищете, но:

  • после поворота линии вы можете сделать все, что вы хотите с вашими ценностями (который включает в себя форматирование вы хотите)
  • с этим case class при сохранении ваших выходных данных в формате json, например, он будет иметь именно тот формат, который вы хотите.
1

Флоран Moiny,

Благодаря ответить на мой вопрос.

Однако я нашел, что это решение имеет некоторые проблемы, если я хочу его вставить.

Например, мне нужно знать заранее, что количество типов устройств возможно в моем источнике данных TB. В этой ситуации также трудно понять ситуацию.

Я предоставил полное решение этой проблемы на Java. Вы можете видеть это здесь.

Я использовал UserDefinedAggregateFunction для этой цели, который UDF специально для ситуации Агрегата.

В основном, сначала я сгруппировался на User и Device, а затем назвал этот пользовательский UDF, чтобы найти распределение устройств в одно и то же время, сделать другое агрегирование на уровне пользователя.

https://github.com/himanshu-parmar-bigdata/spark-java-udf-demo

Спасибо, Himanshu