0

У меня есть dataframe, который имеет несколько атрибутов (от C1 до C2), смещение (в днях) и несколько значений (V1, V2).Spark Dataframe/Dataset: общая условная суммарная сумма

val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset") 
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields] 

scala> inputDF.show 
+---+---+---+---+------+ 
| c1| c2| v1| v2|offset| 
+---+---+---+---+------+ 
| 1| 2| 30|100| -1| 
| 1| 2| 30|100|  0| 
| 1| 2| 30|100|  1| 
| 11| 21| 30|100| -1| 
| 11| 21| 30|100|  0| 
| 11| 21| 30|100|  1| 
+---+---+---+---+------+ 

Что мне нужно сделать, это, вычислить накопленную сумму для V1, V2 для (c1, c2) по смещению.

Я пробовал это, но это далеко от общего решения, которое может работать на любом кадре данных.

import org.apache.spark.sql.expressions.Window 

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
val orderByKey = List("offset").map(x => col(x.trim)) 

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

val outputDF = inputDF 
    .withColumn("cumulative_v1", sum(inputDF("v1")).over(w)) 
    .withColumn("cumulative_v2", sum(inputDF("v2")).over(w)) 

+---+---+---+---+------+---------------------------- 
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2| 
+---+---+---+---+------+-------------|--------------| 
| 1| 2| 30|100| -1|30   | 100   | 
| 1| 2| 30|100|  0|60   | 200   | 
| 1| 2| 30|100|  1|90   | 300   | 
| 11| 21| 30|100| -1|30   | 100   | 
| 11| 21| 30|100|  0|60   | 200   | 
| 11| 21| 30|100|  1|90   | 300   | 
+---+---+---+---+------+----------------------------- 

Задача состоит в том [а] мне нужно делать это через многочисленные и различные окна смещения (от -1 до 1), (от -10 до 10), (от -30 до 30), или любые другие [Ь] Мне нужно использовать эту функцию для нескольких наборов данных/наборов данных, поэтому я надеюсь на универсальную функцию, которая может работать в RDD/Dataset.

Любые мысли о том, как я мог достичь этого в Spark 2.0?

Помощь очень ценится. Благодаря!

+0

Добро пожаловать на переполнение стека! Мы являемся сайтом «вопрос-ответ», а не услугой «Кодеры для найма». Пожалуйста, объясните, что вы пробовали до сих пор, и почему это не сработало. Смотрите: [Почему «Кто-нибудь может мне помочь?» не вопрос?] (http://meta.stackoverflow.com/q/284236) –

+0

Спасибо. Я пришел к приведенному выше набору результатов с моим решением. Добавьте его сейчас. – Yash

ответ

0

Вот примитивный прием с использованием только данных.

import org.apache.spark.sql.expressions.Window 

val groupKey = List("c1", "c2").map(x => col(x.trim)) 
val orderByKey = List("offset").map(x => col(x.trim)) 

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*) 

val inputDF= spark 
    .sparkContext 
    .parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10) 
    .toDF("c1", "c2", "v1", "v2", "offset") 

val outputDF = inputDF 
    .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w)) 
    .withColumn("cumulative_v2", sum(inputDF("v2")).over(w)) 

Это производит суммарную сумму за одно значение для разных окон.

scala> outputDF.show 
+---+---+---+---+------+-------------+-------------+-------------+    
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2| 
+---+---+---+---+------+-------------+-------------+-------------+ 
| 1| 2| 3|100| -2|   0|   0|   100| 
| 1| 2| 30|100| -1|   30|   30|   200| 
| 1| 2| 30|100|  0|   60|   60|   300| 
| 1| 2| 30|100|  1|   90|   90|   400| 
| 1| 2|140|100|  2|   90|   90|   500| 
| 11| 21| 30|100| -1|   30|   30|   100| 
| 11| 21| 30|100|  0|   60|   60|   200| 
| 11| 21| 30|100|  1|   90|   90|   300| 
+---+---+---+---+------+-------------+-------------+-------------+ 

Несколько недостатков этого подхода - [1] для каждого условного окна (-1,1), (-2,2) или любой (from_offset, to_offset), сумма() должна быть называется отдельно. [2] это не общая функция.

Я знаю, что искра принимает список переменных столбцов для агрегатных функций, как это -

val exprs = Map("v1" -> "sum", "v2" -> "sum") 

Но я не уверен в том, как расширить это для оконных функций с переменными условиями. Мне все еще очень интересно узнать, есть ли более эффективная и модульная/многоразовая функция, которую мы можем написать, чтобы решить эту проблему.