2016-05-21 3 views
1

Я использую pyspark для обработки своих данных, и в самом конце мне нужно собрать данные из rdd, используя rdd.collect(). Однако моя искра падает из-за проблемы с памятью. Я пробовал несколько способов, но не повезло. Я теперь работает со следующим кодом, процесс небольшой фрагмент данных для каждого раздела:Какова наилучшая практика для сбора большого набора данных из искры rdd?

def make_part_filter(index): 
    def part_filter(split_index, iterator): 
     if split_index == index: 
      for el in iterator: 
       yield el 
    return part_filter 


for part_id in range(rdd.getNumPartitions()): 
    part_rdd = rdd.mapPartitionsWithIndex(make_part_filter(part_id), True) 
    myCollection = part_rdd.collect() 
    for row in myCollection: 
      #Do something with each row 

Новый код настоящее время я использую не откажет, но, кажется, работает навсегда.

Есть ли лучший способ сбора данных с большого rdd?

+1

Вместо запуска для цикла в формате списка из RDD, почему бы вам не запустить функцию карты? –

+0

На самом деле, мне нужно собрать все данные в rdd и сохранить в большом массиве, а затем подать модуль машинного обучения. – JamesLi

+1

Возможно, модуль машинного обучения принимает итератор, или он действительно хочет массив? С помощью итератора вы можете не загружать сразу все данные в память. Даже тогда я буду беспокоиться о производительности, так как я предполагаю, что модуль машинного обучения собирается «съедать» данные одним потоком. –

ответ

1

Попытка «собрать» огромное RDD проблематична. «Сбор» возвращает список, что означает, что весь контент RDD должен храниться в памяти драйвера. Это проблема «showstopper». Обычно требуется, чтобы приложение Spark могло обрабатывать наборы данных, размер которых значительно превышает то, что соответствовало бы памяти одного узла.

Предположим, что RDD едва входит в память и «собирает» работы. Тогда у нас есть еще один «showstopper» - низкая производительность. В вашем коде собранный RDD обрабатывается в цикле: «для строки в myCollection». Этот цикл выполняется ровно одним ядром. Поэтому вместо обработки данных через RDD, вычисления которых распределяются между всеми ядрами кластера, из которых, вероятно, 100, если не 1000, вместо этого вся работа над всем набором данных размещается на обратной стороне одного ядро.

0

Я не знаю, если это лучший способ, но это лучший способ, который я пробовал. Не уверен, что это лучше или хуже вашего. Такая же идея, разбивая ее на куски, но вы можете быть более гибкими с размером куска.

def rdd_iterate(rdd, chunk_size=1000000): 
    indexed_rows = rdd.zipWithIndex().cache() 
    count = indexed_rows.count() 
    print("Will iterate through RDD of count {}".format(count)) 
    start = 0 
    end = start + chunk_size 
    while start < count: 
     print("Grabbing new chunk: start = {}, end = {}".format(start, end)) 
     chunk = indexed_rows.filter(lambda r: r[1] >= start and r[1] < end).collect() 
     for row in chunk: 
      yield row[0] 
     start = end 
     end = start + chunk_size 

Пример использования, где я хочу, чтобы добавить огромный RDD в файл CSV на диске никогда не заселять список Python со всем РДУ:

def rdd_to_csv(fname, rdd): 
    import csv 
    f = open(fname, "a") 
    c = csv.writer(f) 
    for row in rdd_iterate(rdd): # with abstraction, iterates through entire RDD 
     c.writerows([row]) 
    f.close() 

rdd_to_csv("~/test.csv", my_really_big_rdd)