3

Я готовлю данные для ввода для классификатора в Pyspark. Я использую агрегированные функции в SparkSQL для извлечения таких функций, как среднее и дисперсия. Они сгруппированы по активности, имени и окну. Окно было рассчитано путем деления временной отметки unix на 10000, чтобы разбить на 10 секундных окон времени.Pyspark пользовательский агрегатный расчет по столбцам

sample = sqlContext.sql("SELECT activity, name, window, avg(acc_x) as avgX , variance(acc_x) as varX FROM data GROUP BY activity,name,window ORDER BY activity,name,window") 

Результатом этого будет выглядеть

Activity Name   Window  AvgX  VarX 
Walk accelerometer 95875  2.0   1.0 

То, что я хочу сделать сейчас, чтобы вычислить средний наклон каждой точки в X.

Для этого мне нужно метку времени, окна и X. Я реализовал логику в Python, используя массивы, это то, как это будет выглядеть: вычисление наклона между каждой точкой, а затем получение среднего склона. В идеале я хотел бы сделать это в UDAF, который еще не поддерживается в Pyspark. (Это будет выглядеть так, скажем, если функция ниже, называется наклоном Тогда в SQL вы можете сделать slope(timestamp, X) as avgSlopeX

EDIT. - изменил ввод так понятнее Итак, что я делаю именно вычисление наклона между ними. каждая точка, а затем возвращаются в среднем на склонах в этом окне. так что, как я получаю среднее и дисперсию каждого окна, я также хочу, чтобы получить средний уклон.

#sample input 
timestamp = [1464703425544,1464703426534,1464703427551,1464703428587,1464703429512,1464703430493,1464703431505,1464703432543,1464703433513,1464703434529] 

values = [1021.31,1021.26,1021.19,1021.19,1021.1,1021.1,1021.1, 1021.05,1021.02] 

i = 0; 
slope = 0.0; 
totalSlope = 0.0; 

while (i < len(timestamp) - 1): 
    y2 = values[i+1]; 
    y1 = values[i]; 

    x2 = timestamp[i + 1]; 
    x1 = timestamp[i]; 
    slope = ((y2-y1)/(x2-x1)); 
    totalSlope = totalSlope + slope; 
    i=i+1 

avgSlope = (totalSlope/len(x_values)) 

Как я могу это реализовать «Должен ли я попытаться преобразовать в рамку данных pandas, а затем массив numpy? Если да, то как я могу убедиться, что данные будут отображаться правильно, имея в виду действие GROUP BY ivity, окно имени в запросе sql.

+0

Это определенно не подходит для UDAF. – zero323

+0

@ zero323 как бы вы к этому подошли? – other15

+0

Вычислить наклон для последовательных точек, а затем взять простой средний. Но описание ввода здесь довольно расплывчато. Можете ли вы отправить пример данных с ожидаемым выходом? – zero323

ответ

4

Вообще это не работа для UDAF, потому что UDAF не предоставляют никаких средств для определения заказа. Похоже, вам действительно нужна комбинация оконных функций и стандартных агрегатов.

from pyspark.sql.functions import col, lag, avg 
from pyspark.sql.window import Window 

df = ... 
## DataFrame[activity: string, name: string, window: bigint, 
## timestamp: bigint, value: float] 

group = ["activity", "name", "window"] 

w = (Window() 
    .partitionBy(*group) 
    .orderBy("timestamp")) 

v_diff = col("value") - lag("value", 1).over(w) 
t_diff = col("timestamp") - lag("timestamp", 1).over(w) 

slope = v_diff/t_diff 

df.withColumn("slope", slope).groupBy(*group).agg(avg(col("slope"))) 
+0

это похоже на хороший подход !, но, с помощью Column, похоже, игнорирует «наклон» и просто возвращает среднее значение вместо:/если наклон был зарегистрирован как обычный функция может быть использована? – other15

+0

Typo. Это должно быть наклон в предложении agg. – zero323