2015-09-29 4 views
3

У меня есть два искровых RDD, A имеет 301 500 000 строк и B имеет 150000 строк. Эти 1,5 миллиона строк в B все также появляются в A. Я хотел бы установить разницу между этими двумя RDD, так что я возвращаю A с 300 000 000 строк, причем эти 150000 строк из B больше не присутствуют в A.Выполнение отличия разметки на RDD в Spark Python

Я не могу использовать Spark DataFrames.

Вот система, которую я использую прямо сейчас. Эти RDD имеют первичные ключи. То, что я делаю ниже, это создать (собранный) список тех первичных ключей, которые появляются в B, а затем перебрать первичные ключи A, чтобы найти те, которые не отображаются в списке основных ключей B.

a = sc.parallelize([[0,"foo",'a'],[1,'bar','b'],[2,'mix','c'],[3,'hem', 'd'],[4,'line','e']]) 
b = sc.parallelize([[1,'bar','b'],[2,'mix','c']]) 
b_primary_keys = b.map(lambda x: x[0]).collect() # since first col = primary key 


def sep_a_and_b(row): 
    primary_key = row[0] 
    if(primary_key not in b_primary_keys): 
     return(row) 


a_minus_b = a.map(lambda x: sep_a_and_b(x)).filter(lambda x: x != None) 

Теперь это работает в этом примере проблемы, потому что A и B являются крошечными. Однако это неуспешно, когда я использую свои настоящие наборы данных A и B. Есть ли лучший (более параллельный) способ реализовать это?

ответ

4

Это похоже на то, вы можете решить с subtractByKey

val filteredA = a.subtractByKey(b) 

Для перехода к значению ключа:

val keyValRDD = rdd.map(lambda x: (x[:1],x[1:])) 

* Обратите внимание, что мой питон слаб и что может быть лучше способов разбиения Значения

+0

Я последовательно получаю сообщение об ошибке «ValueError: слишком много значений для распаковки» на любом большом наборе данных (я также попытался это сделать на наборе данных, где A = 20000000 строк и B = 6000 строк) , Мое впечатление, что компилятор должен распаковать A, который слишком велик для хранения в основной памяти. –

+0

Возможно, связано с https://stackoverflow.com/questions/7053551/python-valueerror-too-many-values-to-unpack –

+0

Исправить. Похоже, что проблема связана не с размером, а с проблемой наличия RDD со многими столбцами, а не только с парами '(ключ, значение). –