2013-05-16 4 views
4

Мне не хватает чего-то очевидного в библиотеке Yelp's mrjob job. Настройка класса MRJob почти тривиально проста. Запуск его над file или stdin также так. Но как я могу изменить вход в задание из файла локально или в s3, например, в ведро s3?Как я могу использовать имена объектов s3 в качестве входов в MapManager MRJob, но не сами объекты s3?

Нечто подобное. Предположим, что я хотел, чтобы сосчитать все предметы в моем S3 ведро, начинающиеся со строки «Foo»:

import re 

class MRCountS3Objects(MRJob): 

    define mapper(self, _, botoS3Key): 
     if re.match('^foo', botoS3Key.name): 
      yield 'foo', 1 

    define reduce(self, name, occurrences): 
     yield name, sum(occurrences) 

Это очень надуманный пример, но вы, вероятно, получите мой дрейф. Как я могу сообщить MRJob о работе над потоком объектов s3, игнорируя содержимое объектов? Я видел S3Filesystem.get_s3_keys() method, который получает мне ровно тот поток, в котором я нуждаюсь, но я не уверен, куда идти оттуда.

ответ

4

Построен как минимум один способ достижения этого. Ваш MRJob имеет атрибут stdin, который может быть назначен любому итератору, а затем вы можете запускать работу программно. Этот код, например, должен работать над именами my-bucket:

from mrjob.job import MRJob 
from mrjob.emr import EMRJobRunner 

class MRS3KeyProcessor(MRJob): 
    # Do some MRJob stuff. 
    ... 

def s3_name_generator(bucket): 
    """Generator that returns boto.s3.Key names. 
    """ 
    # Could also use raw boto here. 
    emr = EMRJobRunner() 
    key_stream = emr.fs.get_s3_keys(bucket) 
    for key in key_stream: 
     yield key.name 

def main(): 
    # The '-' argument signifies that we use stdin. 
    mr_job = MRCountS3Objects(['--runner', 'inline', '-']) 
    stdin = s3_name_generator('my-bucket') 
    mr_job.stdin = stdin 
    results = [] 
    with mr_job.make_runner() as runner: 
     runner.run() 
     for line in runner.stream_output(): 
      key, value = mr_job.parse_output_line(line) 
      results.append((key, value)) 
    print(results) 

if __name__ == '__main__': 
    main()