У меня есть dataframe, который имеет несколько атрибутов (от C1 до C2), смещение (в днях) и несколько значений (V1, V2).Spark Dataframe/Dataset: общая условная суммарная сумма
val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]
scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
| 1| 2| 30|100| -1|
| 1| 2| 30|100| 0|
| 1| 2| 30|100| 1|
| 11| 21| 30|100| -1|
| 11| 21| 30|100| 0|
| 11| 21| 30|100| 1|
+---+---+---+---+------+
Что мне нужно сделать, это, вычислить накопленную сумму для V1, V2 для (c1, c2) по смещению.
Я пробовал это, но это далеко от общего решения, которое может работать на любом кадре данных.
import org.apache.spark.sql.expressions.Window
val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))
val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)
val outputDF = inputDF
.withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
.withColumn("cumulative_v2", sum(inputDF("v2")).over(w))
+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
| 1| 2| 30|100| -1|30 | 100 |
| 1| 2| 30|100| 0|60 | 200 |
| 1| 2| 30|100| 1|90 | 300 |
| 11| 21| 30|100| -1|30 | 100 |
| 11| 21| 30|100| 0|60 | 200 |
| 11| 21| 30|100| 1|90 | 300 |
+---+---+---+---+------+-----------------------------
Задача состоит в том [а] мне нужно делать это через многочисленные и различные окна смещения (от -1 до 1), (от -10 до 10), (от -30 до 30), или любые другие [Ь] Мне нужно использовать эту функцию для нескольких наборов данных/наборов данных, поэтому я надеюсь на универсальную функцию, которая может работать в RDD/Dataset.
Любые мысли о том, как я мог достичь этого в Spark 2.0?
Помощь очень ценится. Благодаря!
Добро пожаловать на переполнение стека! Мы являемся сайтом «вопрос-ответ», а не услугой «Кодеры для найма». Пожалуйста, объясните, что вы пробовали до сих пор, и почему это не сработало. Смотрите: [Почему «Кто-нибудь может мне помочь?» не вопрос?] (http://meta.stackoverflow.com/q/284236) –
Спасибо. Я пришел к приведенному выше набору результатов с моим решением. Добавьте его сейчас. – Yash