2

У меня есть операция в искровом свете, которая должна выполняться для нескольких столбцов в кадре данных. Как правило, есть 2 возможности указать такие операцииSpark динамический DAG намного медленнее и отличается от жестко закодированного DAG

  • жёстко
handleBias("bar", df) 
    .join(handleBias("baz", df), df.columns) 
    .drop(columnsToDrop: _*).show 
  • динамически генерировать их из списка COLNAMES
var isFirst = true 
var res = df 
for (col <- columnsToDrop ++ columnsToCode) { 
    if (isFirst) { 
    res = handleBias(col, res) 
    isFirst = false 
    } else { 
    res = handleBias(col, res) 
    } 
} 
res.drop(columnsToDrop: _*).show 

Проблема заключается в том, что DAG, генерируемый динамически, отличается и время выполнения dy namic решение увеличивается гораздо больше, когда используется больше столбцов, чем для жестко закодированных операций.

Мне любопытно, как сочетать элегантность динамичной конструкции с быстрым временем выполнения.

Вот сравнение для групп DAG в примере кода complexity comparison

Для около 80 столбцов это приводит к довольно хороший графике для жесткого кодировкой варианта hardCoded И очень большой, вероятно, менее параллелизуемое и медленной DAG для динамически построенного запроса. hugeMessDynamic

Текущая версия искры (2.0.2) использовали с DataFrames и искровым SQL

код завершить минимальный пример:

def handleBias(col: String, df: DataFrame, target: String = "FOO"): DataFrame = { 
    val pre1_1 = df 
    .filter(df(target) === 1) 
    .groupBy(col, target) 
    .agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col)) 
    .drop(target) 

    val pre2_1 = df 
    .groupBy(col) 
    .agg(mean(target).alias("pre2_" + col)) 

    df 
    .join(pre1_1, Seq(col), "left") 
    .join(pre2_1, Seq(col), "left") 
    .na.fill(0) 
} 

редактировать

Запуск задачи с foldleft генерирует линейный DAG foldleft и жесткое кодирование функции для всех результатов столбцов в hardcoded

Оба намного лучше, чем мои оригинальные DAG, но, тем не менее, жесткий код выглядит лучше для меня. Строка, объединяющая инструкцию SQL в искровом режиме, может позволить мне динамически генерировать граф с жестким кодированием, но это кажется довольно уродливым. Вы видите другой вариант?

+0

Я думаю, что проблема в том, что ваша функция «handleBias» очень сложна, и вам нужно запустить ее для нескольких столбцов. Даже если вы сделаете это жестко запрограммированным для многих столбцов, ваша DAG будет большой, поэтому, возможно, проблема не применяется «динамически», а применяется ко многим столбцам. Итак, если вы можете придумать способ адаптировать свою функцию для обработки нескольких столбцов одновременно, это может значительно помочь. –

+0

@ DanieldePaula Вы видите какой-либо способ выразить этот метод более простым способом, чтобы требовалось меньше вычислительной мощности? –

+0

К сожалению, у меня нет много времени, чтобы подумать об этом прямо сейчас, извините. Если завтра вы не найдете решения, я посмотрю на него. –

ответ

2

Редактировать 1: Удалена одна функция окна из handleBias и преобразована в широковещательное соединение.

Редактировать 2: Изменена стратегия замены для нулевых значений.

У меня есть некоторые предложения, которые могут улучшить ваш код.Во-первых, для функции «handleBias», я хотел бы сделать это с помощью окна функции и «withColumn» вызовы, избегая объединений:

import org.apache.spark.sql.DataFrame 
import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.expressions.Window 

def handleBias(df: DataFrame, colName: String, target: String = "foo") = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 
    val result = df 
    .withColumn("cnt_group", count("*").over(w2)) 
    .withColumn("pre2_" + colName, mean(target).over(w1)) 
    .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
    .drop("cnt_group") 
    result 
} 

Тогда для вызова его на несколько столбцов, я бы рекомендовал использовать foldLeft, который является «функциональный» подход к такого рода проблемы:

val df = Seq((1, "first", "A"), (1, "second", "A"),(2, "noValidFormat", "B"),(1, "lastAssumingSameDate", "C")).toDF("foo", "bar", "baz") 

val columnsToDrop = Seq("baz") 
val columnsToCode = Seq("bar", "baz") 
val target = "foo" 

val targetCounts = df.filter(df(target) === 1).groupBy(target) 
    .agg(count(target).as("cnt_foo_eq_1")) 
val newDF = df.join(broadcast(targetCounts), Seq(target), "left") 

val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(df) { 
    (currentDF, colName) => handleBias(currentDF, colName) 
} 

result.drop(columnsToDrop:_*).show() 

+---+--------------------+------------------+--------+------------------+--------+ 
|foo|     bar|   pre_baz|pre2_baz|   pre_bar|pre2_bar| 
+---+--------------------+------------------+--------+------------------+--------+ 
| 2|  noValidFormat|    0.0|  2.0|    0.0|  2.0| 
| 1|lastAssumingSameDate|0.3333333333333333|  1.0|0.3333333333333333|  1.0| 
| 1|    second|0.6666666666666666|  1.0|0.3333333333333333|  1.0| 
| 1|    first|0.6666666666666666|  1.0|0.3333333333333333|  1.0| 
+---+--------------------+------------------+--------+------------------+--------+ 

Я не уверен, что это улучшит много ваш DAG, но, по крайней мере, это делает уборщик кода и более удобным для чтения.

Ссылка:

+1

большое спасибо за этот отличный ответ. Мне все еще нужно проверить его на более крупные данные. См. Редактирование/2 разных типа DAG, сгенерированных вашим кодом для жесткого кодирования и операции сложения. Почему эти «те же»? –

+0

@GeorgHeiler они разные, потому что ваша жестко кодированная версия использует объединения, что обычно хуже. Линейный DAG означает, что в нем нет объединений, и я думаю, что он выглядит лучше, чем другой. После того, как вы попробуете больше данных, сообщите мне, что быстрее –

+0

'.withColumn (« pre_ »+ colName, coalesce (col (« cnt_group »)/col (« cnt_foo_eq_1 »), горит (0D)))' is not чего я хочу достичь, а не подстановки 0 Я хочу заменить все нулевые значения на соответствующее значение класса == 1, чтобы, например, для A & foo = 1, который равен 0,5, используется в качестве замены для всех A & foo = 0, которые являются нулевыми. –