2014-10-19 4 views
0

Я пытаюсь выполнить простой пример wordcount программно, но я не могу заставить код работать с кластером hadoop.mrjob bad --steps error using make_runner на кластере Hadoop

Работа в test_job.py:

from mrjob.job import MRJob 
import re 


WORD_RE = re.compile(r"[\w']+") 

class MRWordFreqCount(MRJob): 

    def mapper(self, _, line): 
     for word in WORD_RE.findall(line): 
      yield word.lower(), 1 

    def combiner(self, word, counts): 
     yield word, sum(counts) 

    def reducer(self, word, counts): 
     yield word, sum(counts) 

бегун mr_job_test.py:

from test_jobs import MRWordFreqCount 

def test_runner(in_args, input_dir): 
    tmp_output = [] 
    args = in_args + input_dir 
    mr_job = MRWordFreqCount(args.split()) 
    with mr_job.make_runner() as runner: 
     runner.run() 
     for line in runner.stream_output(): 
      tmp_output = tmp_output + [line] 
    return tmp_output 

if __name__ == '__main__': 
    input_dir = 'hdfs:///test_input/' 
    args = '-r hadoop ' 
    print test_runner(args, input_dir) 

я могу запустить этот код локально (с inline вариант), но на Hadoop я получил:

> Traceback (most recent call last): File "mr_job_tester.py", line 17, 
> in <module> 
>  print test_runner(args, input_dir) File "mr_job_tester.py", line 8, in test_runner 
>  runner.run() File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 458, in 
> run 
>  self._run() File "/usr/local/lib/python2.7/dist-packages/mrjob/hadoop.py", line 239, in 
> _run 
>  self._run_job_in_hadoop() File "/usr/local/lib/python2.7/dist-packages/mrjob/hadoop.py", line 295, in 
> _run_job_in_hadoop 
>  for step_num in xrange(self._num_steps()): File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 742, in 
> _num_steps 
>  return len(self._get_steps()) File "/usr/local/lib/python2.7/dist-packages/mrjob/runner.py", line 721, in 
> _get_steps 
>  raise ValueError("Bad --steps response: \n%s" % stdout) ValueError: Bad --steps response: 

ответ

0

(According to this) Путь mrjob представляет файл задания и выполняет его удаленно внутри mapper и редуктор, делает нужными нижеприведенные ниже строки, чтобы быть в файлах декларации работы:

if __name__ == "__main__": 
    MRWordFreqCount.run()