У меня есть два искровых RDD, A имеет 301 500 000 строк и B имеет 150000 строк. Эти 1,5 миллиона строк в B все также появляются в A. Я хотел бы установить разницу между этими двумя RDD, так что я возвращаю A с 300 000 000 строк, причем эти 150000 строк из B больше не присутствуют в A.Выполнение отличия разметки на RDD в Spark Python
Я не могу использовать Spark DataFrames.
Вот система, которую я использую прямо сейчас. Эти RDD имеют первичные ключи. То, что я делаю ниже, это создать (собранный) список тех первичных ключей, которые появляются в B, а затем перебрать первичные ключи A, чтобы найти те, которые не отображаются в списке основных ключей B.
a = sc.parallelize([[0,"foo",'a'],[1,'bar','b'],[2,'mix','c'],[3,'hem', 'd'],[4,'line','e']])
b = sc.parallelize([[1,'bar','b'],[2,'mix','c']])
b_primary_keys = b.map(lambda x: x[0]).collect() # since first col = primary key
def sep_a_and_b(row):
primary_key = row[0]
if(primary_key not in b_primary_keys):
return(row)
a_minus_b = a.map(lambda x: sep_a_and_b(x)).filter(lambda x: x != None)
Теперь это работает в этом примере проблемы, потому что A и B являются крошечными. Однако это неуспешно, когда я использую свои настоящие наборы данных A и B. Есть ли лучший (более параллельный) способ реализовать это?
Я последовательно получаю сообщение об ошибке «ValueError: слишком много значений для распаковки» на любом большом наборе данных (я также попытался это сделать на наборе данных, где A = 20000000 строк и B = 6000 строк) , Мое впечатление, что компилятор должен распаковать A, который слишком велик для хранения в основной памяти. –
Возможно, связано с https://stackoverflow.com/questions/7053551/python-valueerror-too-many-values-to-unpack –
Исправить. Похоже, что проблема связана не с размером, а с проблемой наличия RDD со многими столбцами, а не только с парами '(ключ, значение). –