2016-12-05 21 views
1

У меня есть очень простой вопрос об искре. Обычно я запускаю искровые работы с использованием 50 ядер. При просмотре прогресса работы в большинстве случаев он показывает 50 процессов, работающих параллельно (как это предполагается), но иногда он показывает только 2 или 4 искровых процесса, работающих параллельно. Как это:Спайки и задачи параллелизма

[Stage 8:================================>      (297 + 2)/500] 

ДРР бытия обрабатываемых repartitioned на более чем 100 разделов. Так что это не должно быть проблемой.

У меня есть наблюдения, хотя. Я видел шаблон, который в большинстве случаев это происходит, локальность данных в SparkUI показывает NODE_LOCAL, а в других случаях, когда все 50 процессов запущены, некоторые из процессов показывают RACK_LOCAL. Это заставляет меня сомневаться в том, что это возможно, потому что данные кэшируются перед обработкой в ​​одном узле, чтобы избежать сетевых издержек, и это замедляет дальнейшую обработку.

Если это так, то каким способом избежать этого. И если это не так, что здесь происходит?

ответ

1

После недели или более борьбы с проблемой, я думаю, что нашел причину проблемы.

Если вы боретесь с той же проблемой, хорошей точкой для начала было бы проверить, настроен ли экземпляр Spark в порядке. Об этом есть отличный cloudera blog post.

Однако, если проблема не в конфигурации (как в случае со мной), тогда проблема находится где-то внутри вашего кода. Проблема в том, что иногда из-за разных причин (перекошенные соединения, неравномерные разделы в источниках данных и т. Д.) RDD, над которым вы работаете, получает много данных на 2-3 разделах, а остальные разделы имеют очень мало данных.

Чтобы уменьшить перемещение данных по сети, Spark пытается, чтобы каждый исполнитель обрабатывал данные, находящиеся локально на этом узле. Таким образом, 2-3 исполнителя работают в течение длительного времени, а остальные исполнители просто выполняются с данными за несколько миллисекунд. Вот почему я столкнулся с проблемой, описанной выше.

Способ отладки этой проблемы состоит в том, чтобы в первую очередь проверить размеры разделов вашего RDD. Если один или несколько разделов очень большие по сравнению с другими, то следующим шагом будет поиск записей в больших разделах, чтобы вы могли знать, особенно в случае перекошенных объединений, о том, какой ключ искажается. Я написал небольшую функцию для отладки это:

from itertools import islice 
def check_skewness(df): 
    sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing 
    l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect() 
    max_part = max(l,key=lambda item:item[1]) 
    min_part = min(l,key=lambda item:item[1]) 
    if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times 
     print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n' 
     print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5)  if i == max_part[0] else []).take(5)) 
    else: 
     print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part 

Это дает мне самый маленький и самый большой размер раздела, и если разница между этими двумя более чем в 5 раз, он печатает 5 элементов самого большого раздела, должен дать вам общее представление о том, что происходит.

Как только вы выяснили, что проблема связана с перекосом, вы можете найти способ избавиться от этого перекошенного ключа, или вы можете переразделить свою фреймворк данных, что заставит его распределить равномерно, Теперь вы увидите, что все исполнители будут работать в течение равного времени, и вы увидите гораздо менее страшные ошибки OOM, и обработка будет значительно быстрой.

Это всего лишь мои два цента в качестве новичка Spark, надеюсь эксперты Spark могут добавить еще кое-что к этой проблеме, так как я думаю, что много новичков в мире Spark сталкиваются с подобными проблемами слишком часто.