2017-01-31 5 views
3

У меня есть два файла в искровом кластере, foo.csv и bar.csv, оба с 4 столбцами и одинаковыми точными полями: time, user, url, category.pyspark: Отфильтровать один RDD на основе определенных столбцов другого RDD

Я хотел бы отфильтровать foo.csv по определенным столбцам bar.csv. В конце концов, мне нужны пары ключ/значение (пользователь, категория): [list, of, urls]. Например:

foo.csv: 
11:50:00, 111, www.google.com, search 
11:50:00, 222, www.espn.com, news 
11:50:00, 333, www.reddit.com, news 
11:50:00, 444, www.amazon.com, store 
11:50:00, 111, www.bing.com, search 
11:50:00, 222, www.cnn.com, news 
11:50:00, 333, www.aol.com, news 
11:50:00, 444, www.jet.com, store 
11:50:00, 111, www.yahoo.com, search 
11:50:00, 222, www.bbc.com, news 
11:50:00, 333, www.nytimes.com, news 
11:50:00, 444, www.macys.com, store 

bar.csv: 
11:50:00, 222, www.bbc.com, news 
11:50:00, 444, www.yahoo.com, store 

Если в результате:

{ 
(111, search):[www.google.com, www.bing.com, www.yahoo.com], 
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com] 
} 

Другими словами, если (пользователь, категория) пара существует в bar.csv, я хотел бы, чтобы отфильтровать все линии в foo.csv если они имеют ту же самую точную (пользовательскую, категорию) пару. Таким образом, в приведенном выше примере я хотел бы удалить все строки в foo.csv с (222, news) и (444, store). В конце концов, после того, как я удалю строки, которые я хочу, мне нужен словарь с парами ключ/значение, например: (user, category): [list, of, urls].

Вот мой код:

fooRdd = sc.textFile("file:///foo.txt/") 
barRdd = sc.textFile("file:///bar.txt/") 


parseFooRdd= fooRdd.map(lambda line: line.split(", ")) 
parseBarRdd = barRdd.map(lambda line: line.split(", ")) 



# (n[1] = user_id, n[3] = category_id) --> [n[2] = url] 
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])}) 
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])}) 

Приведенный выше код работает и получает наборы данных в формате я хочу:

(user_id, category): [all, urls, visited, by, user, in, that, category] 

Однако пара вопросов: 1) Я думаю, что это возвращает список словари w/только одна пара к/v и 2) я застрял в том, что делать дальше. Я знаю, что делать по-английски: получите ключи в barGroupRdd (кортежи) и удалите все строки в fooGroupRdd с одним и тем же ключом. Но я новичок в pyspark, и я чувствую, что есть команды, которыми я не пользуюсь. Я думаю, что мой код можно оптимизировать. Например, я не думаю, что мне нужно будет создать строку barGroupRdd, потому что все, что мне нужно от bar.csv (user_id, category) - мне не нужно создавать словарь. Я также думаю, что сначала должен отфильтровать, а затем создать словарь из результата. Любая помощь или совет оцениваются, спасибо!

ответ

2

Вы действительно очень близко.

Вместо этого для каждого РДУ:

fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\ 
    n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])}) 

ли это:

fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]),\ 
    n[2])).groupByKey().map(lambda x: [(x[0]), list(x[1])]) 

Таким образом, вы можете получить доступ ключи с помощью метода в rdd.keys() и создать bar_keys список.

bar_keys = barGroupRdd.keys().collect() 

Тогда вы можете сделать то, что вы сказали. Отфильтруйте строки в fooGroupRdd, у которых есть ключ в bar_keys.

dict(fooGroupRdd.filter(lambda x: x[0] not in bar_keys)\ 
    .map(lambda x: [x[0], x[1]]).collect()) 

Окончательный результат выглядит следующим образом:

{('111', 'search'): ['www.google.com', 'www.bing.com', 'www.yahoo.com'], 
('333', 'news'): ['www.reddit.com', 'www.aol.com', 'www.nytimes.com']} 

Надежда, что помогает.

На ваш комментарий, я тоже задавался вопросом, является ли это наиболее эффективным методом.Изучая методы класса для RDD, вы найдете collectAsMap(), который работает как собирать, но возвращает словарь вместо списка. Тем не менее, при исследовании исходного кода метод просто делает то, что я сделал, поэтому, похоже, это лучший вариант.

+0

Спасибо, что сделал! Похоже, в API появилось много искровых функций, которые мне носят, я надеюсь, что это наиболее эффективное решение. Эта первая замена действительно помогла мне, опять же, высоко ценится. –