2013-09-26 1 views
3

У нас есть система, которая получает архивы в указанном каталоге и на регулярной основе запускает задание mapreduce, которое открывает архивы и обрабатывает файлы внутри них. Чтобы избежать повторной обработки одних и тех же архивов в следующий раз, мы подключаемся к методу close() нашего RecReader, чтобы удалить его после чтения последней записи.Как удалить входные файлы после успешного mapreduce

Проблема с этим подходом (мы считаем) заключается в том, что если конкретное сопоставление терпит неудачу, следующий картограф, делающий еще одну попытку, обнаруживает, что исходный файл был удален считывателем записей с первого, и он бомбит , Мы думаем, что путь должен состоять в том, чтобы удержаться, пока все сопоставление и сокращение не завершится, а затем удалите входные архивы.

Это лучший способ сделать это?

Если да, то как мы можем получить список всех входных файлов, найденных системой из основной программы? (Мы не можем просто вычистить весь входной реж, новые файлы могут присутствовать)

т.е.

. . . 

    job.waitForCompletion(true); 

    (we're done, delete input files, how?) 

    return 0; 
} 
+0

Я думаю, вы должны определить, что такое входной файл. Существует несколько способов прохождения входного пути к Mapper. Более того, то, что является входным путем, часто определяется используемым InputFormat. Мы также можем читать файлы с помощью HDFS apis в Mapper. Это входной файл? – zsxwing

+0

Мы оценили ряд предложений, упомянутых здесь, но тот, который, кажется, наиболее обещает, включает в себя создание исторического PathFilter, который записывает все пути, которые он принимает в файл. При следующем запуске он открывает предыдущий файл (ы) и проверяет новые кандидаты для включения в набор ... и затем записывает другой файл истории. Более эффективный, чем поиск временных меток, поскольку в FileSystem не поступают вызовы. – Brian

ответ

0

Исходя из ситуации вы объясняющей я могу предложить следующее решение: - 1. Процесс мониторинга данных I.e мониторинг каталога, в который высаживаются архивы, должен выполняться отдельным процессом. Этот отдельный процесс может использовать некоторую таблицу метаданных, как в mysql, для размещения записей состояния на основе мониторинга каталогов. Записи метаданных также могут проверять наличие дубликатов. 2. Теперь, основываясь на записи метаданных, отдельный процесс может обрабатывать карту, уменьшающую выполнение задания. Некоторое состояние можно проверить в метаданных для запуска заданий.

+0

Хотя это, безусловно, возможно, добавление дополнительных процессов и таблиц, которые необходимо поддерживать, приносит больше усилий, чем упомянутые выше. Спасибо за ваш вклад! – Brian

1

Самый простой способ, вероятно, будет делать множественный ввод задания, читать каталог для файлов до вы запускаете задание и передаете их вместо каталога в задание (затем удаляйте файлы в списке после выполнения задания).

+0

На самом деле это было проще всего оценить, но, к сожалению, с более чем 200K входными файлами в нашем сценарии мастер потратил более часа, просто называя метод MultipleInputs.add (xx), а не дешевый метод для вызова. Это не потокобезопасно, так что вы не можете разделить усилия. Спасибо за совет все равно. – Brian

+0

вы не сказали, что у вас 200K входных файлов :) –

2

Пара комментариев.

  1. Я думаю, что этот дизайн страстно-подвержен. Что происходит, когда вы обнаруживаете, что кто-то развернул испорченный алгоритм в ваш кластер MR, и вам нужно заполнить архив за месяц? Они ушли. Что происходит, когда обработка занимает больше времени, чем ожидалось, и нужно начинать новую работу до того, как старая полностью будет выполнена? Слишком много файлов присутствует, и некоторые из них перерабатываются. Как насчет того, когда работа начинается, пока архив все еще находится в полете? И т. Д.

  2. Один выход из этой ловушки заключается в том, чтобы архивы переходили во вращающееся местоположение в зависимости от времени и либо сами продували записи, либо (в случае чего-то вроде S3) устанавливали политику хранения, которая позволяет определенное окно для операций. Кроме того, независимо от того, что делает обработка задней части карты, обработка может быть идемпотентной: обработка одной и той же записи дважды не должна отличаться от обработки ее один раз. Что-то подсказывает мне, что если вы уменьшите свой набор данных, это свойство будет трудно гарантировать.

  3. Как минимум, вы можете переименовать обработанные файлы, а не удалять их сразу же, и использовать выражение glob для определения ваших входных данных, которые не включают переименованные файлы. Как я уже говорил, по-прежнему существуют условия гонки.

  4. Вы можете использовать такую ​​очередь, как Amazon SQS, для записи доставки архива, и ваш InputFormat мог бы вытащить эти записи, а не перечислять папку архива при определении входных разделов. Но переработка или засыпка становятся проблематичными без дополнительной инфраструктуры.

  5. Все, что сказано, список разделов генерируется InputFormat. Напиши декоратор вокруг этого, и вы можете хранить список разделов, где бы вы ни захотели для использования мастером после выполнения задания.

+0

# 3 является самым простым, но если мы используем такой магазин, как S3, то переименование на самом деле является нетривиальной копией к новому имени, которая замедляет работу. Сначала у меня был вариант №5, но я предпочитаю # 2, и я надеюсь, что скоро это выучит. Большое спасибо за понимание. – Brian

0

Я думаю, вы должны использовать Apache Oozie для управления рабочим процессом. На веб-сайте Oozie (bolding is mine):

Oozie - это планировщик рабочего процесса для управления заданиями Apache Hadoop.

...

Координатор Oozie рабочих мест рецидивирующие работы Oozie Workflow запускаемые по времени (частоты) и данных Availabilty.