2017-01-03 10 views
4

функция должна быть выполнена для нескольких столбцов в кадре данныхСпарк миграции SQL функцию окна, чтобы RDD для лучшей производительности

def handleBias(df: DataFrame, colName: String, target: String = target) = { 
    val w1 = Window.partitionBy(colName) 
    val w2 = Window.partitionBy(colName, target) 

    df.withColumn("cnt_group", count("*").over(w2)) 
     .withColumn("pre2_" + colName, mean(target).over(w1)) 
     .withColumn("pre_" + colName, coalesce(min(col("cnt_group")/col("cnt_foo_eq_1")).over(w1), lit(0D))) 
     .drop("cnt_group") 
    } 

Это может быть написан хорошо, как показано выше, в искровом-SQL и для цикла. Однако это вызывает много перетасовки (spark apply function to columns in parallel).

Минимальный пример:

val df = Seq(
    (0, "A", "B", "C", "D"), 
    (1, "A", "B", "C", "D"), 
    (0, "d", "a", "jkl", "d"), 
    (0, "d", "g", "C", "D"), 
    (1, "A", "d", "t", "k"), 
    (1, "d", "c", "C", "D"), 
    (1, "c", "B", "C", "D") 
).toDF("TARGET", "col1", "col2", "col3TooMany", "col4") 

    val columnsToDrop = Seq("col3TooMany") 
    val columnsToCode = Seq("col1", "col2") 
    val target = "TARGET" 

    val targetCounts = df.filter(df(target) === 1).groupBy(target) 
    .agg(count(target).as("cnt_foo_eq_1")) 
    val newDF = df.join(broadcast(targetCounts), Seq(target), "left") 

    val result = (columnsToDrop ++ columnsToCode).toSet.foldLeft(newDF) { 
    (currentDF, colName) => handleBias(currentDF, colName) 
    } 

    result.drop(columnsToDrop: _*).show 

Как я могу сформулировать это более эффективно, используя RDD API? aggregateByKey должно быть хорошей идеей, но мне все еще не очень понятно, как применить его здесь, чтобы заменить функции окна.

(обеспечивает немного больше контекста/больше пример https://github.com/geoHeil/sparkContrastCoding)

редактировать

Первоначально я начал с Spark dynamic DAG is a lot slower and different from hard coded DAG, который показан ниже. Хорошо, что каждый столбец работает независимо/параллельно. Недостатком является то, что соединения (даже для небольшого набора данных 300 МБ) становятся «слишком большими» и приводят к невосприимчивой искры.

handleBiasOriginal("col1", df) 
    .join(handleBiasOriginal("col2", df), df.columns) 
    .join(handleBiasOriginal("col3TooMany", df), df.columns) 
    .drop(columnsToDrop: _*).show 

    def handleBiasOriginal(col: String, df: DataFrame, target: String = target): DataFrame = { 
    val pre1_1 = df 
     .filter(df(target) === 1) 
     .groupBy(col, target) 
     .agg((count("*")/df.filter(df(target) === 1).count).alias("pre_" + col)) 
     .drop(target) 

    val pre2_1 = df 
     .groupBy(col) 
     .agg(mean(target).alias("pre2_" + col)) 

    df 
     .join(pre1_1, Seq(col), "left") 
     .join(pre2_1, Seq(col), "left") 
     .na.fill(0) 
    } 

Это изображение с искрой 2.1.0, изображения из Spark dynamic DAG is a lot slower and different from hard coded DAG являются с 2.0.2 toocomplexDAG

ГПДР будет немного проще, когда кэширование применяется df.cache handleBiasOriginal (» col1 ", df). ...

Какие еще возможности, чем функции окна вы видите для оптимизации SQL? В лучшем случае было бы здорово, если бы SQL был сгенерирован динамически.

caching

ответ

1

Главное, чтобы избежать ненужных перетасовки. Прямо сейчас ваш код перемещается дважды для каждого столбца, который вы хотите включить, и результирующий макет данных не может быть повторно использован между столбцами.

Для простоты я предполагаю, что target всегда двоичный ({0, 1}), а все остальные используемые вами столбцы - StringType. Кроме того, я предполагаю, что мощность столбцов достаточно мала, чтобы результаты были сгруппированы и обработаны локально. Вы можете настроить эти методы для обработки других случаев, но для этого требуется больше работы.

РДД API

  • RESHAPE данные от широкоугольного к давно:

    import org.apache.spark.sql.functions._ 
    
    val exploded = explode(array(
        (columnsToDrop ++ columnsToCode).map(c => 
        struct(lit(c).alias("k"), col(c).alias("v"))): _* 
    )).alias("level") 
    
    val long = df.select(exploded, $"TARGET") 
    
  • aggregateByKey, перекроить и собирающие:

    import org.apache.spark.util.StatCounter 
    
    val lookup = long.as[((String, String), Int)].rdd 
        // You can use prefix partitioner (one that depends only on _._1) 
        // to avoid reshuffling for groupByKey 
        .aggregateByKey(StatCounter())(_ merge _, _ merge _) 
        .map { case ((c, v), s) => (c, (v, s)) } 
        .groupByKey 
        .mapValues(_.toMap) 
        .collectAsMap 
    
  • Вы можете использовать lookup т o получать статистику по отдельным столбцам и уровням. Например:

    lookup("col1")("A") 
    
    org.apache.spark.util.StatCounter = 
        (count: 3, mean: 0.666667, stdev: 0.471405, max: 1.000000, min: 0.000000) 
    

    дает вам данные для col1, уровень A. Основываясь на двоичном предложении TARGET, эта информация завершена (вы получаете счет/доли для обоих классов).

    Вы можете использовать поиск, подобный этому, для генерации SQL-выражений или передачи его на udf и применить его к отдельным столбцам.

DataFrame API

  • Преобразование данных пока для RDD API.
  • Compute агрегаты, основанные на уровнях:

    val stats = long 
        .groupBy($"level.k", $"level.v") 
        .agg(mean($"TARGET"), sum($"TARGET")) 
    
  • В зависимости от ваших предпочтений вы можете изменить это для того, чтобы эффективнее присоединяется или преобразовать в локальную коллекцию, и аналогично решению RDD.

0

Использование aggregateByKey Простое объяснение aggregateByKey можно найти here. В основном вы используете две функции: одну, которая работает внутри раздела и которая работает между разделами.

Вам нужно будет сделать что-то вроде агрегата в первом столбце и построить структуру данных внутри с картой для каждого элемента второго столбца для агрегирования и сбора данных там (конечно, вы могли бы сделать два aggregateByKey, если хотите) , Это не решит случай выполнения нескольких прогонов кода для каждого столбца, с которым вы хотите работать (вы можете использовать агрегат в отличие от aggregateByKey для работы со всеми данными и поместить его на карту, но это, вероятно, даст вам даже хуже производительность). Результатом будет одна строка на один ключ, если вы хотите вернуться к исходным записям (как это делает функция окна), вам действительно нужно будет либо присоединить это значение к исходному RDD, либо сохранить все значения внутри и на плоской карте

Я не верю, что это обеспечит вам любое реальное улучшение производительности. Вы бы много работали над переопределением вещей, которые выполняются для вас в SQL, и при этом вы потеряете большинство преимуществ SQL (оптимизация катализатора, управление памятью вольфрама, генерация всего кода сцены и т. Д.)

Улучшение SQL

Что бы сделать вместо этого попытка улучшить саму SQL. Например, результат столбца в функции окна для всех значений является одинаковым. Вам действительно нужна функция окна? Вместо этого вы можете использовать groupBy вместо функции окна (и если вам действительно нужна эта запись, вы можете попытаться присоединиться к результатам, что может обеспечить лучшую производительность, поскольку это не обязательно означает перетасовку всего на каждом шаге).

+0

см http://stackoverflow.com/questions/41169873/spark-dynamic-dag-is-a-lot-slower-and-different-from-hard-coded-dag, а также мой редактировать выше. Вначале я начал использовать групповое соединение с объединениями. Это привело к тому, что работа, не закончившаяся в разумные сроки/лонжерон, похоже, не выполняла никакой операции.Хотя решение соединения отлично работает для небольших данных, я не мог заставить его работать со многими столбцами. С нетерпением ждем предложений по улучшению SQL. –

+0

Я не говорю, что соединение обязательно является решением. Что я говорю, что в большинстве случаев RDD с aggregateByKey будет медленнее. Вы можете пойти и попробовать aggregateByKey, используя ссылку, которую я показал, и основную логику ее реализации. –

+0

Между тем, вы видите способ не использовать медленные функции окна, но все же препятствовать использованию соединения? –

 Смежные вопросы

  • Нет связанных вопросов^_^