2016-11-29 9 views
0

Я хочу, чтобы эта карта уменьшала работу (код ниже) для вывода 10 самых рейтинговых продуктов. Он продолжает давать мне следующее сообщение об ошибке:MapReduce job, чтобы получить 10 лучших значений с использованием MRjob Python

it = izip (iterable, count (0, -1)) # decorate ТипError: аргумент izip # 1 должен поддерживать итерацию.

Я думаю, что это имеет отношение к nlargest Функция, которую я пытаюсь применить.

Любые указатели?

Спасибо!

from mrjob.job import MRJob 
from mrjob.step import MRStep 
from heapq import nlargest 


class MostRatedProduct(MRJob): 

def steps(self): 
    return [ 
     MRStep(mapper = self.mapper_get_ratings, 
       reducer = self.reducer_count_ratings), 
     MRStep(reducer = self.reducer_find_top10) 
    ] 


def mapper_get_ratings(self, _, line): 
    (userID, itemID, rating, timestamp) = line.split(',') 
    yield itemID, 1 

def reducer_count_ratings(self, itemID, ratingCount): 
    yield None, (sum(ratingCount), itemID) 

def top_10(self, ratingPair): 
    for ratingTotal, itemID in ratingPair: 
     top_rated = nlargest(10, ratingTotal) 
    for top_rated in ratingTotal: 
     return (ratingTotal, itemID) 

def reducer_find_top10(self, key, ratingPair): 
    ratingTotal, itemID = self.top_10(ratingPair) 
    yield ratingTotal, itemID 


if __name__ == '__main__': 
    MostRatedProduct.run() 

ответ

0

Я не использовал mrjob, но я использовал MapReduce на кластере AWS, чтобы найти значения из ранее. Вот мой код, который не использует heapq. Надеемся, вы сможете применить ту же концепцию к своему коду. Вот функция картографа

import sys, time 

def Parser(): 
    for line in sys.stdin: 
     line = line.strip('\n') 
     yield line.split() 


def mapper(): 
    counts = list(Parser()) 
    z = sorted(counts, key = lambda x: int(x[1]))[-10:] 
    print '\n'.join(map(lambda x: '\t'.join(x), z)) 


if __name__=='__main__': 
    mapper() 

Вот код для редуктора

import sys, operator, itertools 

def Parser(): 
    for line in sys.stdin: 
     yield tuple(line.strip('\n').split('\t')) 

def reducer(): 
    for key, pairs in itertools.groupby(Parser(), operator.itemgetter(0)): 
     counts = list(Parser()) 
     z = sorted(counts, key = lambda x: int(x[1]))[-10:] 
     print '\n'.join(map(lambda x: '\t'.join(x), z)) 

if __name__=='__main__': 
    reducer() 

я изменил его на выходе топ 10 слов. Имейте в виду, что это пример подсчета слов, когда я разбирал текстовый документ. Надеюсь, это поможет в некотором роде!