2013-09-24 5 views
0

Я пытаюсь запустить mrjob на EMR Amazon. Я тестировал работу локально, используя встроенный бегун, но он не работает при работе на Amazon. Я сузил неудачу до моей зависимости от внешнего файла данных zip_codes.txt. Если я работаю без этой зависимости, используя данные жесткого кодирования, это работает отлично.Как файлы данных должны быть включены в mrjob в EMR?

Я попытался включить необходимый файл данных, используя аргумент файла загрузки. Когда я смотрю на S3, файл делал это там, но, очевидно, что-то идет не так, так что я не могу получить доступ к нему локально.

enter image description here

Вот мой mrjob.conf файл:

runners: 
    emr: 
    aws_access_key_id: FOOBARBAZQUX 
    aws_secret_access_key: IAMASECRETKEY 
    aws_region: us-east-1 
    ec2_key_pair: mapreduce 
    ec2_key_pair_file: $ENV/keys/mapreduce.pem 
    ssh_tunnel_to_job_tracker: true 
    ssh_tunnel_is_open: true 
    cleanup_on_failure: ALL 
    cmdenv: 
     TZ: America/Los_Angeles 

Это мой MR_zip.py файл.

from mrjob.job import MRJob 
import mrjob 
import csv 

def distance(p1, p2): 
    # d = ...  
    return d 

class MR_zip(MRJob): 
    OUTPUT_PROTOCOL = mrjob.protocol.JSONProtocol 
    zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))} 

    def mapper(self, _, line): 
     zip_code_1, poi = line.split(",") 
     zip_code_1 = int(zip_code_1) 
     lat1, lon1 = self.zip_codes[zip_code_1] 
     for zip_code_2, (lat2, lon2) in self.zip_codes.items(): 
      d = distance((lat1, lon1), (lat2, lon2)) 
      yield zip_code_2, (zip_code_1, poi, d) 

    def reducer(self, zip_code_1, ds): 
     result = {} 
     for zip_code_2, poi, d in ds: 
      if poi not in result: 
       result[poi] = (zip_code_2, d) 
      elif result[poi][1] > d: 
       result[poi] = (zip_code_2, d) 
     yield zip_code_1, result 

if __name__ == '__main__': 
    MR_zip.run() 

И, наконец, я запустить его с помощью следующей команды:

python MR_zip.py -r emr --conf mrjob.conf --file zip_codes.txt < poi.txt 

Где zip_codes.txt выглядит следующим образом:

... 
62323,39.817702,-90.66923 
62324,39.988988,-90.94976 
62325,40.034398,-91.16278 
62326,40.421857,-90.80333 
... 

И poi.txt выглядит следующим образом:

... 
210,skate park 
501,theatre 
29001,theatre 
8001,knitting club 
20101,food bank 
... 

ответ

1

Overv МЭН

Существовало две ошибки в своем коде:

  1. Код инициализации для шага должен быть в инициализаторе Стадии в
  2. По умолчанию ОГО использует Python 2.6, исключающая словарь постижений среди прочего

Шаг инициализации

Ever y имеет соответствующий метод инициализации. Например, mapper имеет mapper_init, который может использоваться для инициализации данных, используемых в картографе. Функции reducer и combiner имеют аналогичные методы инициализации. Если вы используете функцию steps, чтобы определить свои собственные шаги, вы также можете определить, какую функцию инициализации вы используете. Подробнее об инициализаторах here.

Остерегайтесь Python версии

На сегодняшний день, ЭМИ использует Python версии 2.6.6 по умолчанию. Поэтому любые зависимости от более поздних версий могут выполняться локально, но возникают проблемы с EMR.

Исправление

Чтобы восстановить код, указанный выше, необходимо удалить строку, определяющую zip_codes в MR_zip.py

zip_codes = {int(zip_code): (float(latitude), float(longitude)) for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r"))} 

и вместо того, чтобы определить его внутри mapper_init без использования словаря постижений.

def mapper_init(self): 
    self.zip_codes = {} 
    for zip_code, latitude, longitude in csv.reader(open("zip_codes.txt", "r")): 
     self.zip_codes[int(zip_code)] = (float(latitude), float(longitude)) 

Другие файлы и командная строка остаются неизменными.

3

Кроме того, вы можете найти полезный MRJob.add_file_option рутинный. Например, указав

self.add_file_option('--config-file', dest='config_file', 
    default=None, help='file with labels', action="append") 

вы можете ссылаться на загруженные файлы с помощью self.options.config_file списка путей.