У меня есть два файла в искровом кластере, foo.csv
и bar.csv
, оба с 4 столбцами и одинаковыми точными полями: time, user, url, category
.pyspark: Отфильтровать один RDD на основе определенных столбцов другого RDD
Я хотел бы отфильтровать foo.csv
по определенным столбцам bar.csv
. В конце концов, мне нужны пары ключ/значение (пользователь, категория): [list, of, urls]. Например:
foo.csv:
11:50:00, 111, www.google.com, search
11:50:00, 222, www.espn.com, news
11:50:00, 333, www.reddit.com, news
11:50:00, 444, www.amazon.com, store
11:50:00, 111, www.bing.com, search
11:50:00, 222, www.cnn.com, news
11:50:00, 333, www.aol.com, news
11:50:00, 444, www.jet.com, store
11:50:00, 111, www.yahoo.com, search
11:50:00, 222, www.bbc.com, news
11:50:00, 333, www.nytimes.com, news
11:50:00, 444, www.macys.com, store
bar.csv:
11:50:00, 222, www.bbc.com, news
11:50:00, 444, www.yahoo.com, store
Если в результате:
{
(111, search):[www.google.com, www.bing.com, www.yahoo.com],
(333, news): [www.reddit.com, www.aol.com, www.nytimes.com]
}
Другими словами, если (пользователь, категория) пара существует в bar.csv
, я хотел бы, чтобы отфильтровать все линии в foo.csv
если они имеют ту же самую точную (пользовательскую, категорию) пару. Таким образом, в приведенном выше примере я хотел бы удалить все строки в foo.csv
с (222, news)
и (444, store)
. В конце концов, после того, как я удалю строки, которые я хочу, мне нужен словарь с парами ключ/значение, например: (user, category): [list, of, urls]
.
Вот мой код:
fooRdd = sc.textFile("file:///foo.txt/")
barRdd = sc.textFile("file:///bar.txt/")
parseFooRdd= fooRdd.map(lambda line: line.split(", "))
parseBarRdd = barRdd.map(lambda line: line.split(", "))
# (n[1] = user_id, n[3] = category_id) --> [n[2] = url]
fooGroupRdd = parseFooRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
barGroupRdd = parseBarRdd.map(lambda n: ((n[1], n[3]), n[2])).groupByKey().map(lambda x: {x[0]: list(x[1])})
Приведенный выше код работает и получает наборы данных в формате я хочу:
(user_id, category): [all, urls, visited, by, user, in, that, category]
Однако пара вопросов: 1) Я думаю, что это возвращает список словари w/только одна пара к/v и 2) я застрял в том, что делать дальше. Я знаю, что делать по-английски: получите ключи в barGroupRdd
(кортежи) и удалите все строки в fooGroupRdd с одним и тем же ключом. Но я новичок в pyspark, и я чувствую, что есть команды, которыми я не пользуюсь. Я думаю, что мой код можно оптимизировать. Например, я не думаю, что мне нужно будет создать строку barGroupRdd
, потому что все, что мне нужно от bar.csv
(user_id, category) - мне не нужно создавать словарь. Я также думаю, что сначала должен отфильтровать, а затем создать словарь из результата. Любая помощь или совет оцениваются, спасибо!
Спасибо, что сделал! Похоже, в API появилось много искровых функций, которые мне носят, я надеюсь, что это наиболее эффективное решение. Эта первая замена действительно помогла мне, опять же, высоко ценится. –