2016-08-19 1 views
2

У меня есть РДД, как это:Как сгруппировать и добавить в искру?

{"key1" : "fruit" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "US" , "key3" : "2" } 

{"key1" : "vegetable" , "key2" : "US" , "key3" : "1" } 

{"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" } 

{"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" } 

Моя цель состоит в том, чтобы первой группе ключом1, а затем группа по key2 и, наконец, добавить KEY3.

Я ожидаю конечный результат, как,

key1   key2  key3 
"fruit"  , "US" , 3 
"vegetable" , "US" , 1 
"fruit"  , "Japan" , 3 
"vegetable" , "Japan" , 3 

Мой код начинается, как показано ниже,

rdd_arm = rdd_arm.map(lambda x: x[1]) 

rdd_arm включает выше ключ: формат значения.

Я не уверен, куда идти дальше. Может кто-нибудь помочь мне?

ответ

1

Давайте создадим свой RDD:

In [1]: rdd_arm = sc.parallelize([{"key1" : "fruit" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "US" , "key3" : "2" }, {"key1" : "vegetable" , "key2" : "US" , "key3" : "1" }, {"key1" : "fruit" , "key2" : "Japan" , "key3" : "3" }, {"key1" : "vegetable" , "key2" : "Japan" , "key3" : "3" }]) 
In [2]: rdd_arm.collect() 
Out[2]: 
[{'key1': 'fruit', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'US', 'key3': '2'}, 
{'key1': 'vegetable', 'key2': 'US', 'key3': '1'}, 
{'key1': 'fruit', 'key2': 'Japan', 'key3': '3'}, 
{'key1': 'vegetable', 'key2': 'Japan', 'key3': '3'}] 

Во-первых, вы должны создать новый ключ, который будет пара key1 и key2. Значение этого будет key3, так что вы хотите сделать что-то вроде этого:

In [3]: new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])) 

In [4]: new_rdd.collect() 
Out[4]: 
[('fruit, US', '1'), 
('fruit, US', '2'), 
('vegetable, US', '1'), 
('fruit, Japan', '3'), 
('vegetable, Japan', '3')] 

Затем мы хотим добавить значения клавиш, которые являются дубликатами, просто называя reduceByKey(), как это:

In [5]: new_rdd = new_rdd.reduceByKey(lambda a, b: int(a) + int(b)) 

In [6]: new_rdd.collect() 
Out[6]: 
[('fruit, US', 3), 
('fruit, Japan', '3'), 
('vegetable, US', '1'), 
('vegetable, Japan', '3')] 

и все готово!


Конечно, это может быть один-лайнер, как это:

new_rdd = rdd_arm.map(lambda x: (x['key1'] + ", " + x['key2'], x['key3'])).reduceByKey(lambda a, b: int(a) + int(b)) 
+1

Привет, gsamaras. Спасибо за последующие действия. –

2

Я решил это сам.

Мне пришлось создать ключ, содержащий несколько ключей, а затем добавить.

rdd_arm.map(lambda x : x[0] + ", " + x[1] , x[2]).reduceByKey(lambda a,b : a + b) 

Ниже был полезен вопрос.

How to group by multiple keys in spark?

+0

Позвольте мне сказать, что это не работает для меня, я получаю ошибки неопределенного имени, и после получения Получив их, я не смог заставить его работать. В результате я опубликовал новый ответ, надеюсь, вам понравится! Я поднял вопрос, хотя, поскольку это заставило меня практиковать! Благодаря! – gsamaras