2013-12-10 2 views
4

У меня есть два процесса. Один из них записывает файлы в HDFS, а другой загружает эти файлы.чтение файла из HDFS только после его полной записи и закрытия

Первый процесс (тот, который записывает файл) используется:

private void writeFileToHdfs(byte[] sourceStream, Path outFilePath) { 
FSDataOutputStream out = null; 
try { 
    // create the file 
    out = getFileSystem().create(outFilePath); 
    out.write(sourceStream); 
} catch (Exception e) { 
    LOG.error("Error while trying to write a file to hdfs", e); 
} finally { 
    try { 
    if (null != out) 
     out.close(); 
    } catch (IOException e) { 
    LOG.error("Could not close output stream to hdfs", e); 
    } 
} 
} 

Второй процесс читает эти файлы для дальнейшей обработки. При создании файла он сначала создается, а затем заполняется содержимым. Этот процесс требует времени (несколько миллисекунд, но все же), и в течение этого времени второй процесс может забрать файл, прежде чем он будет полностью написан и закрыт.

Обратите внимание, что HDFS не сохраняет информацию о блокировке в namenode - так что там нет демона, который может проверить, заблокирован ли файл перед его доступом.

Интересно, как лучше всего решить эту проблему.

Вот мои мысли:

  1. Копирование файлов в новую папку, как только они полностью написаны и закрыты, то второй процесс будет читать эту новую папку.
  2. Переименование файла в соответствии с каким-либо соглашением об именах, когда оно полностью написано и закрыто, тогда второй процесс будет читать в соответствии с этим соглашением об именах.

У меня такое чувство, что я пытаюсь решить хорошо известную проблему, и я что-то пропустил. Существует ли наилучшая практика для такой проблемы?

+1

Замечание: если вы используете Java 7, вам не нужно все это делать, просто попробуйте с ресурсами –

+0

Почему бы вам не использовать сокеты для связи между вашими процессами? P1 может связываться с P2 и в то же время выгружать файл таким образом, чтобы, если он не подключен одновременно, P2 все равно может забрать позже ... – UmNyobe

+1

@RossDrew Я использую java7, но новичок в нем. Я проверю это, спасибо. – forhas

ответ

3

У этого сообщества есть некоторые вещи для этого. Только touch файл и сообщение об ошибке сообщают вам, если он уже заблокирован.

import org.apache.commons.io.* 

boolean fileAvail = false; 

try { 
    FileUtils.touch(fileName); //throws IOException if being used 
    fileAvail = true; 
} catch (IOException e) { 
    fileAvail = false; 
} 

(также) Попробуйте с ресурсами

В Java 7 вы можете использовать эту функцию на все, что реализует Closable как файлы, сокеты и соединения с базой данных, где он будет закрывать автоматически, как только объем блок попытка закончился делая это

try (FSDataOutputStream out = getFileSystem().create(outFilePath)) 
{ 
    //use out in here 
} 
//No finally required - catch is optional 

... сохраняет все, что дополнительный код

+0

инициирует ли это исключение, когда файл открыт с правами на чтение? – UmNyobe

+0

Я так не думаю. Я думаю, что он в основном проверяет, редактируется ли дата последнего изменения файлов и выбрасывается, если нет. –

+1

Взгляд перед прыжком: не используйте исключения для управления потоком. –

0

Вам действительно нужны два процесса? почему бы вам не создать два потока, а затем присоединиться к нему.

+0

Это не зависит от меня. – forhas

2

Вы говорите о two separate processes here or about two separate threads в том же процессе (JVM)?

В обоих случаях это consumer-producer problem, и вам не хватает some proper synchronization между производителем и потребителем. Если вы используете два потока в одном и том же процессе JVM, вы можете использовать BlockingQueue, чтобы передать произвольный токен, передаваемый файлами, от производителя к потребителю, например, например, имя файла после полного файла и его поток закрыт. Как только имя файла было найдено в очереди, потребитель может быть уверен, что файл был полностью написан и закрыт, потому что это подтверждено производителем.

Однако, если вы используете два разных процесса, проблему немного сложнее решить в зависимости от языка другого компонента и настройки сети, но вам придется реализовать какую-то очередь, которая может быть использована оба процесса, например, путем отправки некоторой информации через локальный сетевой порт, чтобы процессы знали о работе друг друга.

Независимо от того, что я всегда избегал бы перемещать файлы в файловой системе, поскольку это довольно дорогостоящая операция по сравнению с отправкой простых токенов. А также перемещение файлов arround может выдать файлы, которые еще не были полностью перемещен в зависимости от языка, который вы используете.

+0

Действительно, если мы говорим о 2 потоках, это простая проблема производителя-потребителя, с которой я знаком. Но я, говоря о 2 совершенно разных процессах, работает на разных машинах. – forhas

+0

Такая же проблема здесь: Отправить сообщение через открытый порт, где процесс производства подтверждает процесс потребления, который должен обрабатывать файл * X *. Я бы избежал вывода такого состояния из файловой системы. Таким образом, вы можете добавить новых потребителей и производителей в более позднем состоянии и добавить некоторую балансировку нагрузки. –