7

Все отлично работает локально, когда я делаю следующим образом:Разбитой Ошибки трубы вызывает потоковый Elastic MapReduce работу на AWS на провал

cat input | python mapper.py | sort | python reducer.py 

Однако, когда я запускаю работу потокового MapReduce на AWS Elastic MapReduce, работа не завершена успешно. mapper.py проходит часть пути (я знаю это из-за написания до stderr по пути). Картографа прерывается ошибкой «сломанной трубы», который я смог извлечь из Syslog в попытке задания после того, как он не:

java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123) 
    at java.io.DataOutputStream.flush(DataOutputStream.java:106) 
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579) 
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished 
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done 
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation 
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child 
java.io.IOException: log:null 
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s] 
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null 
HOST=null 
USER=hadoop 
HADOOP_USER=null 
last Hadoop input: |null| 
last tool output: |text/html 1| 
Date: Mon Mar 26 07:19:05 UTC 2012 
java.io.IOException: Broken pipe 
    at java.io.FileOutputStream.writeBytes(Native Method) 
    at java.io.FileOutputStream.write(FileOutputStream.java:282) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105) 
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65) 
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109) 
    at java.io.DataOutputStream.write(DataOutputStream.java:90) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72) 
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51) 
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125) 
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50) 
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36) 
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441) 
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377) 
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:396) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) 
    at org.apache.hadoop.mapred.Child.main(Child.java:249) 
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task 
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written 

Вот mapper.py. Обратите внимание, что я пишу в стандартный вывод, чтобы обеспечить себя с информацией для отладки:

#!/usr/bin/env python 

import sys 
from warc import ARCFile 

def main(): 
    warc_file = ARCFile(fileobj=sys.stdin) 
    for web_page in warc_file: 
     print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging 
     print '%s\t%s' % (web_page.header.content_type, 1) 
    print >> sys.stderr, 'done' #For debugging 
if __name__ == "__main__": 
    main() 

Вот что я получаю в STDERR за попытку задачи, когда mapper.py запускается:

text/html 1 
text/html 1 
text/html 1 

В основном, цикл проходит через 3 раза, а затем резко останавливается, если python не выбрасывает какую-либо ошибку. (Примечание: должен выводить тысячи строк). Даже неперехваченное исключение должно появиться в stderr.

Поскольку MapReduce полностью работает на моем локальном компьютере, я предполагаю, что это проблема с тем, как Hadoop имеет дело с выходом, который я печатаю с mapper.py. Но я не знаю, что это за проблема.

ответ

9

Ваш потоковый процесс (ваш скрипт на Python) заканчивается преждевременно. Это может быть сделано с учетом того, что ввод данных завершен (например, интерпретация EOF) или проглоченное исключение. В любом случае, Hadoop пытается подключиться через STDIN к вашему сценарию, но поскольку приложение завершилось (и, следовательно, STDIN больше не является допустимым файловым дескриптором), вы получаете ошибку BrokenPipe. Я бы предложил добавить трассировки stderr в вашем скрипте, чтобы увидеть, какая строка ввода вызывает проблему. Счастливое кодирование,

-Geoff

+4

babonk, может вы предоставляете подробную информацию о том, как вы решили свою проблему, используя этот совет? –

+0

То же самое. У меня, очевидно, есть аналогичная ошибка: http: // stackoverflow.com/questions/18556270/aws-elastic-mapreduce-doesnt-seem-to-be-correct-conversion-the-streaming-to-j, и учитывая, что он работает при передаче по каналам, я не понимаю, как " исправить "его для потоковой передачи. – Mittenchops

1

У меня нет опыта работы с Hadoop на AWS, но у меня был такой же ошибка на регулярном Hadoop кластере - и в моем случае проблема была, как я начал питон -mapper ./mapper.py -reducer ./reducer.py работал, но -mapper python mapper.py Didn» т.

Вы также, кажется, используете нестандартный пакет python warc, вы отправляете необходимые файлы в streamjob? -cacheFiles или -cacheArchive может быть полезно.

+0

Как вы включаете нестандартные пакеты python? В частности, эластичное отображение AWS-атрибутов не создает таких параметров, как кеш-файлы. – Mittenchops

6

Это сказано выше, но позвольте мне попытаться пояснить - вы должны блокировать stdin, даже если вам это не нужно! Это не так же, как Linux-каналы, поэтому не позволяйте этому обмануть вас. Что происходит, интуитивно, Streaming поддерживает ваш исполняемый файл, а затем говорит: «Подождите здесь, пока я пойду для вас». Если ваш исполняемый файл останавливается по какой-либо причине до того, как Streaming отправит вам 100% ввода, Streaming говорит: «Эй, где же этот исполняемый файл вышел, что я встал? ... Хммм ... труба сломана, позвольте мне поднять это исключение !» Итак, вот некоторые питона коды, все это делает, что делает кошка, но вы можете заметить, этот код не будет выход, пока весь вход обрабатывается, и это ключевой момент:

#!/usr/bin/python 
import sys 

while True: 
    s = sys.stdin.readline() 
    if not s: 
     break 
    sys.stdout.write(s) 
+1

Я получал эту ошибку, потому что я ничего не делал с помощью ввода. Я добавил этот код (хотя он ничего не делает для меня), и ошибка пошла. – schoon

 Смежные вопросы

  • Нет связанных вопросов^_^