У меня есть некоторые tar-файлы на HDFS. Моя цель - извлечь эти файлы & сохраненных извлеченных файлов на HDFS.Получение исключения: входной буфер ввода IOException является закрытым исключением при извлечении файла tar
для примера:
Это моя структура входного каталога (HDFS).
Path : /data/160823 -->
--------
| 160823 |
--------
|
| --- 00
|----- xyz.tar
|----- xyz2.tar
| --- 01
|----- xyz3.tar
|----- abc2.tar
| --- 02
|----- abc3.tar
|----- abc4.tar
.
.
.
--- 23
|----- pqr.tar
|----- pqr2.tar
Ожидаемый результат будет:
--------
| Output |
--------
|
|----- xyz.gz
|----- xyz2.gz
Мой код извлечения этих битуминозных файлов и хранить эти файлы в пути на HDFS.
Так что я могу извлечь первый файл .tar &, способный хранить вывод на HDFS, но после этого, читая следующий .tar-файл, я получаю это исключение.
java.io.IOException: input buffer is closed
at org.apache.commons.compress.archivers.tar.TarBuffer.readRecord(TarBuffer.java:190)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getRecord(TarArchiveInputStream.java:302)
at org.apache.commons.compress.archivers.tar.TarArchiveInputStream.getNextTarEntry(TarArchiveInputStream.java:230)
at com.lsr.TarMapper.call(TarMapper.java:53)
at com.lsr.TarMapper.call(TarMapper.java:1)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Это мой фрагмент кода,
import java.util.ArrayList;
import java.util.List;
import java.io.File;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.spark.api.java.function.FlatMapFunction;
import com.utils.FileWrapper;
public class TarMapper implements FlatMapFunction<String, String>{
public Iterable<String> call(String arg0) throws Exception {
System.out.println("Arg0 : "+arg0);
List<String> untarFile = new ArrayList<String>();
FileSystem fileSystem = LTar.fs;
FSDataInputStream fsin = null;
TarArchiveInputStream tarin = null;
OutputStream outstr = null;
TarArchiveEntry tarentry = null;
FSDataOutputStream fsDataOutputStream = null;
Path outputPath = null;
try{
fileSystem = FileSystem.get(LTar.conf);
fsin = fileSystem.open(new Path(arg0));
tarin = new TarArchiveInputStream(fsin);
tarentry = tarin.getNextTarEntry();
while (tarentry != null) {
if (!tarentry.isDirectory()) {
System.out.println("TAR ENTRY : "+tarentry);
outputPath = new Path("/data/tar/"+tarentry.getName().substring(2));
fsDataOutputStream = fileSystem.create(outputPath);
System.out.println("Name : "+tarentry.getName()+"Other : ");
IOUtils.copyBytes(tarin, fsDataOutputStream, LTar.conf);
}
tarentry = tarin.getNextTarEntry();
}
}catch (Exception e) {
e.printStackTrace();
} finally {
if (tarin != null) {
tarin.close();
}
if (fsin != null) {
fsin.close();
}
if (fileSystem != null) {
fileSystem.close();
}
if(outstr !=null){
outstr.close();
}
if(fsDataOutputStream != null){
fsDataOutputStream.close();
}
}
return untarFile;
}
}
Просьба представить свое предложение по этому вопросу.
Это работает! Благодаря @EJP, я просматриваю документацию пакета IOUtils, нашел другую функцию copyBytes() с логическим аргументом, с этим я могу извлечь все .tar-файлы. Вот синтаксис для этого. copyBytes (InputStream in, OutputStream out, Конфигурация conf, boolean close) –