2012-01-27 1 views
3

Вот мой прецедент ... У меня есть служба восходящего потока, которая отправляет мои данные приложения Netty по сети, и эти данные должны быть опубликованы для нескольких клиентов, подключенных к Netty. Данные, отодвинутые к клиентам, должны быть «Передача-кодировка передачи: chunked».Как правильно использовать ChunkedStream

Я нашел ChunkedStream и хотя это может быть, я мог бы создать PipedInputStream и PipedOutputStream (подключенный к PipedInputStream) и записать ChunkedStream к каналу. Затем, когда данные, полученные от моей вверх по течению службы я мог записать данные в PipedOutputStream каналов и было бы быть отправлены клиентам:

В channelConnected

PipedInputStream in = new PipedInputStream(); 
PipedOutputStream out = new PipedOutputStream(in); 
ctx.getChannel().write(new PersistentChunkedStream(in)); 

Отдельный поток публикует данные до связанных каналов

ChannelBuffer buff = ChannelBuffers.copiedBuffer("FOO",CharsetUtil.UTF_8); 
out.write(buff.array()); 
channel.get(ChunkedWriteHandler.class).resumeTransfer(); 

мне пришлось расширить ChunkedStream вернуть null из nextChunk, если имеется 0 байт («приостановить» запись без зависания нити), поэтому я вызываю resumeTransfer после того, как я напишу PipedOutputStream связанного канала. Когда я отладки и пошагово код, я могу видеть flush из ChunkedWriteHandler называют, который не называют:

Channels.write(ctx, writeFuture, chunk, currentEvent.getRemoteAddress()); 

с байтами, которые я написал в PipedOutputStream,, но он никогда не получил от клиента.

HTTP свернуться

~ $ curl -vN http://localhost:8080/stream 
* About to connect() to localhost port 8080 (#0) 
* Trying 127.0.0.1... connected 
* Connected to localhost (127.0.0.1) port 8080 (#0) 
> GET /stream HTTP/1.1 
> User-Agent: curl/7.19.7 (universal-apple-darwin10.0) libcurl/7.19.7 OpenSSL/0.9.8r zlib/1.2.3 
> Host: localhost:8080 
> Accept: */* 
> 
< HTTP/1.1 200 OK 
< Transfer-Encoding: chunked 
< 
### NOTE: NO "FOO" TRANSMIT BACK ### 

Есть мысли? Может быть, есть лучший способ сделать это?

+0

Вот скриншот как раз перед пишет «Foo» на канале: http://screencast.com/t/XATjUfCsre6U Заметим, что вы можете увидеть фрагмент содержит «70, 79, 79 "= FOO –

+0

Это действительно странно, потому что я вижу, что' write0' из 'NioWorker' вызывается и записывается на канал, но ничего на другом конце. Я даже не забудьте использовать «-N» завитка, чтобы предотвратить буферизацию на стороне клиента. Я также позволил ему запустить цикл в течение некоторого времени, думая, что что-то нужно покраснеть, но все равно нет кубиков. –

+0

Это должно быть я. Я вижу это в WireShark. Может быть, есть терминатор, который я не передаю, чтобы сообщить, что кусок был получен. –

ответ

4

Интересно, почему вы даже хотите использовать PipedInputStream/PipedOutputStream. Я думаю, что было бы проще/проще просто вызвать Channel.write (..) напрямую без ваших данных. Просто будьте внимательны, чтобы представить как можно больше данных Channel.write (..), поскольку это дорогостоящая операция.

Вы можете вызвать Channel.write (..) из любого потока, который вы хотите, в качестве поточно-безопасного.

+0

Поскольку для передачи данных я должен передать ChunkedInput в Channel.write (..). В настоящее время единственными реализованными ChunkedInput являются файлы и потоки. Поскольку данные, которые я получаю для пересылки клиентам, не являются файлами и являются постоянными, я решил, что попытаюсь использовать поток. Затем я мог бы подключать потоки и пересылать данные вверх по течению к подключенным клиентам. –

+0

(продолжение) В моем примере у меня есть клиент, подключенный к ожиданию данных с сервера, поэтому, когда сервер получает некоторые данные из восходящего потока, ему необходимо отправить его клиенту (но он будет помечен). Соединение остается открытым, и когда больше данных поступает из восходящего потока, оно передается клиенту. Я искал способ периодически пересылать переданные данные клиенту с постоянным подключением. (fyi, PipedInput/PipedOutputStream был тем, что я использовал для моего тестового примера) –

+0

Я знаю, что вам нужно отправить куски, но вы можете сделать это только с Channel.write (..), просто убедитесь, что вы не закрываете Канал, прежде чем вы закончите с ним. Channel.write (..) просто напишите ChannelBuffers, то же самое верно для ChunkedInput .. –

3

Просто добавьте еще немного контента в ответ, предоставленный Норманном.

При передаче произвольных фрагментов данных, вы должны сначала послать новый DefaultHttpResponse (только один раз):

HttpResponse res = new DefaultHttpResponse(); 
res.setChunked(true); 
res.setHeader(Names.TRANSFER_ENCODING, Values.CHUNKED); 
channel.write(res); 

Тогда в любое время вы хотите записать в канал с произвольным куске, звоните:

HttpChunk chunk = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(str.getBytes(CharsetUtil.UTF_8))); 
channel.write(chunk); 
+1

И не забудьте написать фрагмент нулевой длины, чтобы сигнализировать конец ответа на браузер: channel.write (новый DefaultHttpChunk (ChannelBuffers.EMPTY_BUFFER)); – Paul

+0

Обратите внимание, что '.setHeader (...) 'устарел в течение некоторого времени; используйте '.headers(). set (...)', чтобы сделать то же самое через объект 'HttpHeaders' вместо – bbarker

2

Я знаю, что это старый вопрос, но, надеюсь, это помогает кому-то.

ChunkedStream НЕ подразумевает HTTP Chunking ... это неудачное столкновение имен, которое я могу сказать. Chunked streams только для того, чтобы не загружать весь элемент в память, эффективно ChunkedWriter пересылает обратно в ChunkedStream после каждого фрагмента, чтобы запросить дополнительные данные.

Как выясняется, вы можете использовать парадигму ChunkedStream для создания чего-то, что делает HTTP-chunking для вас из стандартного потока ввода. Код ниже реализует ChunkedInput и принимает InputStream. Он также автоматически добавляет конечный http chunk для указания EOF, но делает это только один раз в соответствии с спецификацией ChunkedInput.

public class HttpChunkStream implements ChunkedInput { 

private static final int CHUNK_SIZE = 8192; 
boolean eof = false; 

InputStream data; 

HttpChunkStream (InputStream data) { 
    this.data= data; 
} 

byte[] buf = new byte[CHUNK_SIZE]; 
@Override 
public Object nextChunk() throws Exception { 
    if (eof) 
     return null;   
    int b = data.read(buf); 
    if (b==-1) { 
     eof=true; 
     return new DefaultHttpChunk(ChannelBuffers.EMPTY_BUFFER);   
    }     
    DefaultHttpChunk c = new DefaultHttpChunk(ChannelBuffers.wrappedBuffer(buf,0,b));  
    return c; 
} 

@Override 
public boolean isEndOfInput() throws Exception { 
    return eof; 
} 

@Override 
public boolean hasNextChunk() throws Exception { 
    return isEndOfInput()==false; 
} 

@Override 
public void close() throws Exception { 
    Closeables.closeQuietly(data);    
} 

}

 Смежные вопросы

  • Нет связанных вопросов^_^