После недели или более борьбы с проблемой, я думаю, что нашел причину проблемы.
Если вы боретесь с той же проблемой, хорошей точкой для начала было бы проверить, настроен ли экземпляр Spark в порядке. Об этом есть отличный cloudera blog post.
Однако, если проблема не в конфигурации (как в случае со мной), тогда проблема находится где-то внутри вашего кода. Проблема в том, что иногда из-за разных причин (перекошенные соединения, неравномерные разделы в источниках данных и т. Д.) RDD, над которым вы работаете, получает много данных на 2-3 разделах, а остальные разделы имеют очень мало данных.
Чтобы уменьшить перемещение данных по сети, Spark пытается, чтобы каждый исполнитель обрабатывал данные, находящиеся локально на этом узле. Таким образом, 2-3 исполнителя работают в течение длительного времени, а остальные исполнители просто выполняются с данными за несколько миллисекунд. Вот почему я столкнулся с проблемой, описанной выше.
Способ отладки этой проблемы состоит в том, чтобы в первую очередь проверить размеры разделов вашего RDD. Если один или несколько разделов очень большие по сравнению с другими, то следующим шагом будет поиск записей в больших разделах, чтобы вы могли знать, особенно в случае перекошенных объединений, о том, какой ключ искажается. Я написал небольшую функцию для отладки это:
from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample for fast processing
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part
Это дает мне самый маленький и самый большой размер раздела, и если разница между этими двумя более чем в 5 раз, он печатает 5 элементов самого большого раздела, должен дать вам общее представление о том, что происходит.
Как только вы выяснили, что проблема связана с перекосом, вы можете найти способ избавиться от этого перекошенного ключа, или вы можете переразделить свою фреймворк данных, что заставит его распределить равномерно, Теперь вы увидите, что все исполнители будут работать в течение равного времени, и вы увидите гораздо менее страшные ошибки OOM, и обработка будет значительно быстрой.
Это всего лишь мои два цента в качестве новичка Spark, надеюсь эксперты Spark могут добавить еще кое-что к этой проблеме, так как я думаю, что много новичков в мире Spark сталкиваются с подобными проблемами слишком часто.