функция должна быть выполнена для нескольких столбцов в кадре данныхСпарк миграции SQL функцию окна, чтобы RDD для лучшей производительности
def handleBias(df: DataFrame, colName: String, target: String = target) = {
val w1 = Window.partitionBy(colName)
val w2 = Window.partitionBy(colName, target)
df.withColumn("cnt_group", count("*").over(w2))
.withColumn("pre2_" + colName, mean(target).over(w1))
.withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D)))
.drop("cnt_group")
}
Это может быть написан хорошо, как показано выше, в искровом-SQL и для цикла. Однако это вызывает много перетасовки (spark apply function to columns in parallel).
Минимальный пример:
val df = Seq(
(0, "A", "B", "C", "D"),
(1, "A", "B", "C", "D"),
(0, "d", "a", "jkl", "d"),
(0, "d", "g", "C", "D"),
(1, "A", "d", "t", "k"),
(1, "d", "c", "C", "D"),
(1, "c", "B", "C", "D")
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")
val columnsToDrop = Seq("col3TooMany")
val columnsToCode = Seq("col1", "col2")
val target = "TARGET"
val targetCounts = df.filter(df(target) === 1).groupBy(target)
.agg(count(target).as("cnt_foo_eq_1"))
val newDF = df.join(broadcast(targetCounts), Seq(target), "left")
val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) {
(currentDF, colName) => handleBias(currentDF, colName)
}
result.drop(columnsToDrop: _*).show
Как я могу сформулировать это более эффективно, используя RDD API? aggregateByKey
должно быть хорошей идеей, но мне все еще не очень понятно, как применить его здесь, чтобы заменить функции окна.
(обеспечивает немного больше контекста/больше пример https://github.com/geoHeil/sparkContrastCoding)
редактировать
Первоначально я начал с Spark dynamic DAG is a lot slower and different from hard coded DAG, который показан ниже. Хорошо, что каждый столбец работает независимо/параллельно. Недостатком является то, что соединения (даже для небольшого набора данных 300 МБ) становятся «слишком большими» и приводят к невосприимчивой искры.
handleBiasOriginal("col1", df)
.join(handleBiasOriginal("col2", df), df.columns)
.join(handleBiasOriginal("col3TooMany", df), df.columns)
.drop(columnsToDrop: _*).show
def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = {
val pre1_1 = df
.filter(df(target) === 1)
.groupBy(col, target)
.agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col))
.drop(target)
val pre2_1 = df
.groupBy(col)
.agg(mean(target).alias("pre2_" + col))
df
.join(pre1_1, Seq(col), "left")
.join(pre2_1, Seq(col), "left")
.na.fill(0)
}
Это изображение с искрой 2.1.0, изображения из Spark dynamic DAG is a lot slower and different from hard coded DAG являются с 2.0.2
ГПДР будет немного проще, когда кэширование применяется df.cache handleBiasOriginal (» col1 ", df). ...
Какие еще возможности, чем функции окна вы видите для оптимизации SQL? В лучшем случае было бы здорово, если бы SQL был сгенерирован динамически.
см http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag, а также мой редактировать выше. Вначале я начал использовать групповое соединение с объединениями. Это привело к тому, что работа, не закончившаяся в разумные сроки/лонжерон, похоже, не выполняла никакой операции.Хотя решение соединения отлично работает для небольших данных, я не мог заставить его работать со многими столбцами. С нетерпением ждем предложений по улучшению SQL. –
Я не говорю, что соединение обязательно является решением. Что я говорю, что в большинстве случаев RDD с aggregateByKey будет медленнее. Вы можете пойти и попробовать aggregateByKey, используя ссылку, которую я показал, и основную логику ее реализации. –
Между тем, вы видите способ не использовать медленные функции окна, но все же препятствовать использованию соединения? –