2016-10-20 8 views
-1

Это мой фрагмент кодаАльтернатива BufferedOutputStream?

@Override 
    protected RecordWriter<String, String> getBaseRecordWriter(
      FileSystem fs, JobConf job, String name, Progressable arg3) 
        throws IOException { 
     Path file2 = FileOutputFormat.getOutputPath(job); 
     String path = file2.toUri().getPath()+File.separator+ name; 
     FSDataOutputStream fileOut = new FSDataOutputStream(new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null); 
     return new LineRecordWriter<String, String>(fileOut, "\t"); 
    } 

я использую Спарк 1.6.1 и в моем коде я использовал saveAsHadoopFile() метод, для которого я пишу класс OUTPUTFORMAT, полученный из org.apache.hadoop.mapred.lib.MultipleTextOutputFormat и я перезаписываю вышеуказанный метод.

В кластере он записывает поврежденные записи в выходные файлы. я думаю, что это из-за BufferedOutputStream в

FSDataOutputStream fileOut = new FSDataOutputStream(
       new BufferedOutputStream(new FileOutputStream(path, true),104857600)), null); 

Может у нас есть какие-либо альтернативы для bufferedOutputStream, так как он пишет, как только буфер заполняется.

Примечание: обновлен код. Приносим извинения за неудобства.

+2

В вашем коде не существует «BufferedOutputStream», не говоря уже о каких-либо доказательствах вашего убеждения в том, что он вызывает повреждение данных. Неясно, что вы спрашиваете, и вероятная проблема XY. – EJP

+0

Единственное повреждение, которое BufferedOutputStream может вызвать, - это усеченный файл, но только если вы не сбросили() или не закрыли() его. –

+0

Я обновил код. Я пытался использовать разные комбинации, поэтому получил неправильный. –

ответ

0

У меня проблема .. в кластере каждый рабочий попытается написать в том же (общем) файле, что и рабочие на разных машинах означают разные JVM и, следовательно, синхронизированный файл write wont работает здесь. вот почему коррупционные записи. Также я использовал NFS, что является важным фактором.