2015-06-12 3 views
1

UPD: Вопрос уже недействителен, поскольку оказалось, что две из 100 таблиц имели на несколько порядков больше строк, чем остальные (у которого было 500). Когда «плохие» таблицы удаляются, объединение распределяется справедливо и завершается в предсказуемое время.одно задание занимает очень много времени при множественном левом объединении в Spark-SQL (1.3.1)


У меня есть около 100 Свечи DataFrames, < = 500 строк каждый, но примерно такого же размера (планирует иметь несколько десятков тысяч строк позже). Иды записей всех столбцов являются подмножествами идентификаторов первой (справочной) таблицы.

Я хочу, чтобы внешнее соединение соединяло все таблицы с первым по id. Я делаю это следующим образом (в pyspark):

df1.join(df2, df2.id == df1.id, 'left_outer') 
    .join(df3, df3.id == df1.id, 'left_outer') 
    ... 

Это операции соединения генерирует 200 рабочих мест, все из которых лишь несколько финишем в пару секунд. Последняя работа, однако, занимает очень много времени (час или около того) и работает (очевидно) только на одном процессоре. Пользовательский интерфейс искровой сети показывает, что эта работа приобрела слишком много записей в случайном порядке.

Почему это происходит и как лучше настроить Spark, чтобы избежать этого?


enter image description here


Запрос "объяснить SELECT * FROM ... LEFT OUTER JOIN ... ... ..." выглядит следующим образом:

== Physical Plan == 
Project [id#0, ... rest of the columns (~205) ...] 
HashOuterJoin [id#0], [id#370], LeftOuter, None 
    HashOuterJoin [id#0], [id#367], LeftOuter, None 
    HashOuterJoin [id#0], [id#364], LeftOuter, None 
    ... 
    Exchange (HashPartitioning [id#364], 200) 
    Project [...cols...] 
    PhysicalRDD [...cols...], MapPartitionsRDD[183] at map at newParquet.scala:542 
    Exchange (HashPartitioning [id#367], 200) 
    Project [...cols...] 
    PhysicalRDD [..cols...], MapPartitionsRDD[185] at map at newParquet.scala:542 
Exchange (HashPartitioning [id#370], 200) 
    Project [...cols...] 
    PhysicalRDD [...cols...], MapPartitionsRDD[187] at map at newParquet.scala:542 
+0

Вы пытались запустить объяснение по запросу? – Holden

+0

@Holden Я просто попробовал и опубликовал сводку своего вывода. Оказывает ли это больше света на корень проблемы? – dmytro

ответ

1

Использование перераспределение после присоединения может помочь.

У меня были подобные ситуации. Соедините два dfs с 200 разделами и снова присоединитесь, и это никогда не заканчивается.

Я попытался добавить раздел (50) в DF, который будет соединен, и тогда он сработает.

+0

Эй, спасибо за предложение. На самом деле оказалось, что две из 100 таблиц имели гораздо большее количество строк (не 500, как я утверждал), поэтому мой вопрос уже недействителен. Тем не менее, у меня такая же проблема, когда я присоединяюсь к двум большим таблицам, и переделы не помогают :( – dmytro