2016-12-30 5 views
0


Приветствия Все,
Мне нужна помощь в разрешении проблемы передачи файлов. Я реализовал Netty-код для переноса одного двоичного файла размером 10 Мбайт с одного хоста (Node 0) на другой хост (Node 1), но передается только 8.5 КБ файла, и мне сложно понять, почему. Я использую ChunkWriteHandler для отправки блоков 1 Мбайт файла за один раз через ChunkedNioFile (см. Код ниже). Кроме того, я попытался передать файлы размером более 1 МБ, такие как 100 МБ, 500 МБ и 1 ГБ и передано только 8.5 КБ файла. Если я уменьшу размер блока, указанный в ChunkedNioFile, с 1 МБ до 512 КБ или ниже, тогда будет передано 17 КБ, что вдвое больше, чем предыдущие передачи файлов. Кроме того, я попытался использовать только ChunkedFile, но получил те же результаты передачи. Я могу успешно перенести и получить заголовки файлов: имя файла, размер файла (длина) и смещение файла (где начать чтение или запись), но всего несколько КБ фактического файла. Может кто-нибудь сказать мне, что происходит, и как я могу решить эту проблему? (Ниже приведен код).Понимание и разрешение проблемы передачи файлов с использованием Netty

Спасибо,

Код Установка:

  • FileSenderInitializer.java
  • FileSenderHandler.java
  • FileSender.java
  • FileReceiverInitializer.java
  • FileReceiverHandler. java
  • FileReceiver.java

FileSenderInitializer.java - Инициализировать трубопровод канала с обработчиками канала

общественного класса FileSenderInitializer расширяет ChannelInitializer {

 @Override 
     public void initChannel(SocketChannel ch) throws Exception { 
     ch.pipeline().addLast(
     //new LengthFieldPrepender(8), 
     new ChunkedWriteHandler(), 
     new FileSenderHandler()); 
     } 
     } 

FileSenderHandler.java - Посылает информация заголовка файла - имя файла, смещение, длина, а затем фактический файл

@Override 
public void channelActive(ChannelHandlerContext ctx) throws Exception { 
try { 
String fileRequest = "ftp Node0/root/10MB_File.dat Node1/tmp/10MB_File_Copy.dat"; 

//Source File to send/transfer to the Destination Node 
String theSrcFilePath = "/root/10MB_File.dat"; 

//File Name to write on the destination node, once the file is received 
String theDestFilePath = "/tmp/10MB_File_Copy.dat"; 

//Get the source file to send 
File theFile = new File(theSrcFilePath); 
FileChannel theFileChannel = new RandomAccessFile(theFile, "r").getChannel(); 

//Get the length of the file 
long fileLength = theFileChannel.size(); 
//Get the offset 
long offSet = 0; 

//Copy the offset to the ByteBuf 
ByteBuf offSetBuf = Unpooled.copyLong(offSet); 
//Copy the file length to the ByteBuf 
ByteBuf fileLengthBuf = Unpooled.copyLong(fileLength); 

//Get the Destination Filename (including the file path) in Bytes 
byte[] theDestFilePathInBytes = theDestFilePath.getBytes(); 
//Get the length of theFilePath 
int theDestSize = theDestFilePathInBytes.length; 
//Copy the Dest File Path length to the ByteBuf 
ByteBuf theDestSizeBuf = Unpooled.copyInt(theDestSize); 
//Copy the theDestFilePathInBytes to the Byte Buf 
ByteBuf theDestFileBuf = Unpooled.copiedBuffer(theDestFilePathInBytes); 

//Send the file Headers: FileName Length, the FileName, the Offset and the file length 
ctx.write(theDestSizeBuf); 
ctx.write(theDestFileBuf); 
ctx.write(offSetBuf); 
ctx.write(fileLengthBuf); 
ctx.flush(); 

//Send the 10MB File in 1MB chunks as specified by the following chunk size (1024*1024*1) 
ctx.write(new ChunkedNioFile(theFileChannel, offSet, fileLength, 1024 * 1024 * 1)); 
ctx.flush(); 

}catch(Exception e){ 
System.err.printf("FileSenderHandler: Channel Active: Error: "+e.getMessage()); 
e.printStackTrace(); 
} 
} //End channelActive 

FileSender.java - бутстрапирования канал и связывает этот клиент/хост для другого хоста

public static void main(String[] args) throws Exception { 
    // Configure the client/ File Sender 
    EventLoopGroup group = new NioEventLoopGroup(); 
    try { 
    Bootstrap b = new Bootstrap(); 
    b.group(group) 
    .channel(NioSocketChannel.class) 
    .option(ChannelOption.TCP_NODELAY, true) 
    .handler(new FileSenderInitializer()); 

    // Start the client. 
    ChannelFuture f = b.connect(HOST, PORT).sync(); 

    // Wait until the connection is closed. 
    //f.channel().closeFuture().sync(); 
    } finally { 
    // Shut down the event loop to terminate all threads. 
    group.shutdownGracefully(); 
    } 
    } 
} 

FileReceiverInitializer.java - Инициализировать трубопровод канала с обработчиками канала

public class FileReceiverInitializer extends ChannelInitializer<SocketChannel> { 

public FileReceiverInitializer(){ 

} 

@Override 
public void initChannel(SocketChannel ch) throws Exception { 
ch.pipeline().addLast( 
    //Read in 1MB data at a time (which is the max frame length), length field offset starts at 0, length of the length field is 8 bits, length adjustment is 0, strip the 8 bits representing the length field from the frame 
//new LengthFieldBasedFrameDecoder(1024*1024*1, 0, 8, 0, 8), 
new FileReceiverHandler()); 
} 
} 

FileReceiverHandler.java - получает информацию заголовка файла - Имя файла, смещение, длину, а затем сам файл

public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
while (msg.readableBytes() >= 1){ 
    //Read in the size of the File Name and it's directory path 
    if (!fileNameStringSizeSet) { 
    fileNameStringSizeBuf.writeBytes(msg, ((fileNameStringSizeBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringSizeBuf.writableBytes())); //INT_SIZE = 4 & LONG_SIZE = 8 (the byte size of an int and long) 
    if (fileNameStringSizeBuf.readableBytes() >= INT_SIZE) { 
     fileNameStringSize = fileNameStringSizeBuf.getInt(fileNameStringSizeBuf.readerIndex());//Get Size at index = 0; 
     fileNameStringSizeSet = true; 
    //Allocate a byteBuf to read in the actual file name and it's directory path 
     fileNameStringBuf = ctx.alloc().buffer(fileNameStringSize); 
    } 
    } else if (!readInFileNameString) { 
    //Read in the actual file name and it's corresponding directory path 
    fileNameStringBuf.writeBytes(msg, ((fileNameStringBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileNameStringBuf.writableBytes())); 
    if (fileNameStringBuf.readableBytes() >= fileNameStringSize) { 
     readInFileNameString = true; 
     //convert the data in the fileNameStringBuf to an ascii string 
     thefileName = fileNameStringBuf.toString(Charset.forName("US-ASCII")); 

     //Create file 
     emptyFile = new File(thefileName); //file Name includes the directory path 
     f = new RandomAccessFile(emptyFile, "rw"); 
     fc = f.getChannel(); 
    } 
}else if (!readInOffset) { 
    offSetBuf.writeBytes(msg, ((offSetBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : offSetBuf.writableBytes())); 
    if (offSetBuf.readableBytes() >= LONG_SIZE) { 
    currentOffset = offSetBuf.getLong(offSetBuf.readerIndex());//Get Size at index = 0; 
    readInOffset = true; 
    } 

} else if (!readInFileLength) { 
    fileLengthBuf.writeBytes(msg, ((fileLengthBuf.writableBytes() >= msg.readableBytes()) ? msg.readableBytes() : fileLengthBuf.writableBytes())); 
    //LONG_SIZE = 8 
    if (fileLengthBuf.readableBytes() >= LONG_SIZE) { 
    fileLength = fileLengthBuf.getLong(fileLengthBuf.readerIndex());//Get Size at index = 0; 
    remainingFileLength = fileLength; 
    readInFragmentLength = true; 
    } 
} else { 
    if (!readInCompleteFile) { 
    if (msg.readableBytes() < remainingFileLength) { 
     if (msg.readableBytes() > 0) { 
     currentFileBytesWrote = 0 
     while (msg.readableBytes >= 1){ 
      int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset); 
      currentOffset += fileBytesWrote; 
      remainingFileLength -= fileBytesWrote; 
      msg.readerIndex(msg.readerIndex + fileBytesWrote); 
     } 
     } 
    } else { 
     int remainingFileLengthInt = (int) remainingFileLength; 
     while (remainingFileLength >= 1){ 
     int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), remainingFileLengthInt), currentOffset); 

     currentOffset += fileBytesWrote; 
     remainingFileLength -= fileBytesWrote; 
     remainingFileLengthInt-= fileBytesWrote; 
     msg.readerIndex(msg.readerIndex + fileBytesWrote); 
     } 

     //Set readInCompleteFile to true 
     readInCompleteFile = true; 

    } 
    }//End else if file chunk 
    }//End Else 
}//End While 
}//End Read Method 

FileReceiver.Java - бутстрапирования на сервер и принимает соединения

public static void main(String[] args) throws Exception { 
// Configure the server 
EventLoopGroup bossGroup = new NioEventLoopGroup(1); 
EventLoopGroup workerGroup = new NioEventLoopGroup(); 
try { 
ServerBootstrap b = new ServerBootstrap(); 
b.group(bossGroup, workerGroup) 
.channel(NioServerSocketChannel.class) 
.handler(new LoggingHandler(LogLevel.INFO)) 
.childHandler(new FileReceiverInitializer()) 
.childOption(ChannelOption.AUTO_READ, true) 
.bind(LOCAL_PORT).sync().channel().closeFuture().sync(); 
} finally { 
bossGroup.shutdownGracefully(); 
workerGroup.shutdownGracefully(); 
} 
} 

-- 

ответ

0

Возможно, я ошибаюсь, но следующий странно мне:

 int fileBytesWrote = fc.write(msg.nioBuffer(msg.readerIndex(), msg.readableBytes()), currentOffset); 
     currentOffset += fileBytesWrote; 
     remainingFileLength -= fileBytesWrote; 
     msg.readerIndex(msg.readerIndex + fileBytesWrote); 
     // msg.readerIndex (or msg.readerIndex() ?) changed already 

Вы можете значение резервного readerIndex() перед выполнением этого задания.

Имея несколько килобайта, кажется, связан с: - либо (? Только первой один) вы не потребляете все пакеты, как вы хотели бы - вы читаете это не правильно (пропуская несколько байт, как я подозреваю, что в коде как показано выше)

Не могли бы вы проследить каждую операцию чтения (на стороне сервера)? Это может помочь вам (зная, сколько байтов вы получили, сколько вы написали, что такое readerIndex/readableBytes/offset, например).

0

Другая причина может быть: клиентская сторона, вы сразу же отключите группу после подключения. это может быть причиной, так как клиент может «прервать» передачу, поэтому у сервера не будет полной передачи?

+0

Спасибо, Фредерик –

0

Проблема заключалась в том, что когда основное приложение FileSender.java завершило выполнение его кода, оно прекратилось, в результате чего FileSenderHandler завершится. Однако, чтобы заблокировать основное приложение FileSender.java от завершения, я использовал следующий оператор: f.channel(). CloseFuture(). Sync() ;. в которой f является каналом ChannelFuture, переданным от подключения к серверу через вызов: b.connect (HOST, PORT) .sync(); Это позволит сохранить FileSender и разрешить файловому файлу передавать всю информацию без раннего завершения.

Тем не менее, мои новые вопросы: как приложение может закрыть канал и заставить основное приложение разблокировать, как только все данные будут отправлены и подтверждены? В настоящее время он заблокирован от вызова f.channel(). CloseFuture(). Sync() ;. но после отправки всех данных и получения подтверждения, как я могу разблокировать основное приложение. Я думал, что если я закрываю канал, closeFuture будет возвращен как истинный, тем самым разблокируя основное приложение. Кроме того, я попытался закрыть канал из FileSenderHandler и FileReceiverHandler с помощью ctx.channel(). Close(), но канал не закрыл и не разблокировал основное приложение.

Причина, по которой мне нужно разблокировать приложение, так что я могу распечатать пропускную способность консоли после того, как все данные были отправлены и подтверждены. если у меня есть несколько каналов данных, и программа заблокирована, будет напечатана только первая пропускная способность канала передачи данных. Таким образом, FileSender.java будет выглядеть следующим образом. Но даже если у меня есть один канал данных, и я пытаюсь закрыть канал в FileSenderHandler, основное приложение (FileSender.java) все еще блокируется и зависает на ChannelFuture.channel(). CloseFuture(). Sync(); Чтобы выйти, я должен ввести управление C на терминале. ЛЮБЫЕ ИДЕИ НА КАК Я МОГУ ОСУЩЕСТВИТЬ ОСНОВНОЕ ПРИЛОЖЕНИЕ ВСЕ ДАННЫЕ, ПОЛУЧЕНЫ И ПОЛУЧЕНЫ?

FileSender.java - бутстрапирования канал и связывает этот клиент/хост для другого хоста

public static void main(String[] args) throws Exception { 
// Configure the client/ File Sender 
EventLoopGroup group = new NioEventLoopGroup(); 
try { 
for (int i =0; i<numOfDataChannels; i++) { 
Bootstrap b = new Bootstrap(); 
b.group(group) 
.channel(NioSocketChannel.class) 
.option(ChannelOption.TCP_NODELAY, true) 
.handler(new FileSenderInitializer()); 

// Start the client. 
ChannelFuture f = b.connect(HOST, PORT).sync(); 

    addChannelFutureToList(f); 
} 

// Wait until the connection is closed for each data channel, but also who can actually close the channel 
for (ChannelFuture f: channelFutureList){ 
    f.channel().closeFuture().sync(); 
} 

//When Channel is closed PRINT THROUGHPUT OF ALL THE DATA CHANNELS 
printThroughput(); 
} finally { 
// Shut down the event loop to terminate all threads. 
group.shutdownGracefully(); 
} 
} 
} 

FileSenderHandler.java - Ханделс I/O события канала, такие как Read/Write

public void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { 
try { 
. 
. 
//After received msg Ack, close the channel, this should unblock the main application (FileSender.java) since after closing the channel closeFuture will be fulfilled 
ctx.channel().close(); 

}catch(Exception e){ 
    System.err.printf("ChannelRead Error Msg: " + e.getMessage()); 
    e.printStackTrace(); 

}