2016-07-05 2 views
0

У меня есть сеансы пользовательских игр, содержащие: идентификатор пользователя, идентификатор игры, оценку и отметку времени, когда игра была сыграна.Совокупный первый сгруппированный элемент из последующих элементов

from pyspark import SparkContext 
from pyspark.sql import HiveContext 
from pyspark.sql import functions as F 

sc = SparkContext("local") 

sqlContext = HiveContext(sc) 

df = sqlContext.createDataFrame([ 
    ("u1", "g1", 10, 0), 
    ("u1", "g3", 2, 2), 
    ("u1", "g3", 5, 3), 
    ("u1", "g4", 5, 4), 
    ("u2", "g2", 1, 1), 
], ["UserID", "GameID", "Score", "Time"]) 

Желаемой Выход

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+ 

Я хочу, чтобы преобразовать данные таким образом, что я получаю максимальный счет в первой игре пользователь играл, а также максимальный балл второй игры (бонус если я также могу получить максимальную оценку всех последующих игр). К сожалению, я не уверен, как это возможно с Spark SQL.

Я знаю, что могу группировать по UserID, GameID, а затем agg, чтобы получить максимальный балл и минимальное время. Не уверен, как исходить оттуда.

Уточнение: обратите внимание, что MaxScoreGame1 и MaxScoreGame2 относятся к первому и второму игровому игроку; а не GameID.

ответ

1

Вы можете попробовать использовать комбинацию функций Window и Pivot.

  1. Получить номер строки для каждой игры, разбитой на UserID, упорядоченной по времени.
  2. Отфильтруйте до GameNumber, равного 1 или 2.
  3. Поверните на это, чтобы получить желаемую форму вывода.

К сожалению, я использую scala not python, но нижеследующее должно быть довольно легко переносимым в библиотеку python.

import org.apache.spark.sql.expressions.Window 

// Use a window function to get row number 
val rowNumberWindow = Window.partitionBy(col("UserId")).orderBy(col("Time")) 

val output = { 
    df 
    .select(
     col("*"), 
     row_number().over(rowNumberWindow).alias("GameNumber") 
    ) 
    .filter(col("GameNumber") <= lit(2)) 
    .groupBy(col("UserId")) 
    .pivot("GameNumber") 
    .agg(
     sum(col("Score")) 
    ) 
} 

output.show() 

+------+---+----+ 
|UserId| 1| 2| 
+------+---+----+ 
| u1| 10| 2| 
| u2| 1|null| 
+------+---+----+ 
+1

Кроме того, чтобы добавить, если вы хотите увидеть больше, чем две игры в выходе просто не фильтровать и шарнирный позаботится об остальном. – Blakey

+0

Окно и row_number сделали трюк. Я собираюсь опубликовать свое решение в PySpark, которое немного отличается. Можете ли вы проверить, работает ли ваш код с шоу, чтобы я мог предоставить вам ответ? – ksindi

+1

Просто обновленный с выходом, а также заметил, что я фактически использовал select вместо groupBy на своде, который не сработает. Заинтересованы в том, как вы получили 5 в качестве 2-го игрового счета для пользователя 1, предполагая, что в исходном фрейме данных есть опечатка, согласно вашему сообщению («u1», «g3», 2, 2), («u1», «g3», 5, 3), – Blakey

1

Решение с PySpark:

from pyspark.sql import Window 

rowNumberWindow = Window.partitionBy("UserID").orderBy(F.col("Time")) 

(df 
.groupBy("UserID", "GameID") 
.agg(F.max("Score").alias("Score"), 
     F.min("Time").alias("Time")) 
.select(F.col("*"), 
     F.row_number().over(rowNumberWindow).alias("GameNumber")) 
.filter(F.col("GameNumber") <= F.lit(2)) 
.withColumn("GameMaxScoreCol", F.concat(F.lit("MaxScoreGame"), F.col("GameNumber"))) 
.groupBy("UserID") 
.pivot("GameMaxScoreCol") 
.agg(F.max("Score")) 
).show() 

+------+-------------+-------------+ 
|UserID|MaxScoreGame1|MaxScoreGame2| 
+------+-------------+-------------+ 
| u1|   10|   5| 
| u2|   1|   null| 
+------+-------------+-------------+