во-первых, позвольте мне объяснить контекст:Отправить несколько asynchonous запросов на клиенте Нетти
Я должен создать клиент, который будет отправлять много HTTP-запросов для загрузки изображений. Эти запросы должны быть асинхронными, поскольку, как только изображение будет завершено, оно будет добавлено в очередь, а затем распечатает на экране. Поскольку изображения могут быть большими, а ответы распределены, мой обработчик должен объединить его в буфер.
Итак, я следую кодам примеров Netty (HTTP spoon example).
В настоящее время у меня есть три статических карты для хранения для каждого канала идентификатора канала и буфера/куба boolean/моего конечного объекта.
private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>();
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>();
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>();
После этого я создаю свой загрузочный клиент и счетчик countDown количество ожидающих запросов. Окончательная очередь и счетчик передаются моему обработчику, когда изображение ответа завершено.
final ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.setOption("connectTimeoutMillis", 30000);
final CountDownLatch latch = new CountDownLatch(downloadList.size()) {
@Override
public void countDown() {
super.countDown();
if (getCount() <= 0) {
try {
queue.put(END_OF_QUEUE);
bootstrap.releaseExternalResources();
} catch (InterruptedException ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
}
}
};
bootstrap.getPipeline().addLast("codec", new HttpClientCodec());
bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch));
После этого я создаю канал для каждого загружаемого изображения и когда канал подключен, запрос будет создан и отправлен. Хост и порт уже были извлечены раньше.
for (final ImagePack pack : downloadList) {
final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture cf) throws Exception {
final Channel channel = future.getChannel();
PACK_MAP.put(channel.getId(), pack);
final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url);
request.setHeader(HttpHeaders.Names.HOST, host);
request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);
request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES);
if (channel.isWritable()) {
channel.write(request);
}
}
});
}
Теперь это мой ChannelHandler, который является внутренним классом, который распространяется SimpleChannelUpstreamHandler
. Когда канал подключен, создается новая запись в BUFFER_MAP
и в CHUNKS_MAP
. BUFFER_MAP
содержит все буферы изображений, используемые обработчиком для объединения фрагментов изображения из каналов, а CHUNKS_MAP
содержит ответ chunked boolean. По завершении ответа в очередь добавляется изображение InputSteam
, задержка защелки и канал закрыты.
private class TileClientHandler extends SimpleChannelUpstreamHandler {
private CancellableQueue<Object> queue;
private CountDownLatch latch;
public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) {
this.queue = queue;
this.latch = latch;
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception {
super.writeComplete(ctx, e);
if(!BUFFER_MAP.contains(ctx.getChannel().getId())){
BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000));
}
if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){
CHUNKS_MAP.put(ctx.getChannel().getId(), false);
}
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
final Integer channelID = ctx.getChannel().getId();
if (!CHUNKS_MAP.get(channelID)) {
final HttpResponse response = (HttpResponse) e.getMessage();
if (response.isChunked()) {
CHUNKS_MAP.put(channelID, true);
} else {
final ChannelBuffer content = response.getContent();
if (content.readable()) {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(content);
BUFFER_MAP.put(channelID, buf);
messageCompleted(e);
}
}
} else {
final HttpChunk chunk = (HttpChunk) e.getMessage();
if (chunk.isLast()) {
CHUNKS_MAP.put(channelID, false);
messageCompleted(e);
} else {
final ChannelBuffer buf = BUFFER_MAP.get(channelID);
buf.writeBytes(chunk.getContent());
BUFFER_MAP.put(channelID, buf);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
e.getCause().printStackTrace();
latch.countDown();
e.getChannel().close();
}
private void messageCompleted(MessageEvent e) {
final Integer channelID = e.getChannel().getId();
if (queue.isCancelled()) {
return;
}
try {
final ImagePack p = PACK_MAP.get(channelID);
final ChannelBuffer b = BUFFER_MAP.get(channelID);
p.setBuffer(new ByteArrayInputStream(b.array()));
queue.put(p.getTile());
} catch (Exception ex) {
LOGGER.log(Level.WARNING, ex.getMessage(), ex);
}
latch.countDown();
e.getChannel().close();
}
}
Моя проблема, когда я выполняю этот код, у меня есть эти исключения:
java.lang.IllegalArgumentException: invalid version format: 3!}@
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
java.lang.IllegalArgumentException: invalid version format:
at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108)
at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68)
at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format:
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread.
at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71)
at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171)
at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324)
at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360)
at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595)
at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82)
at org.jboss.netty.channel.Channels.close(Channels.java:720)
at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200)
at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432)
at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:662)
А также некоторые NPE появляется несколько раз.
java.lang.NullPointerException
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409)
at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113)
at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443)
at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
Всего этот код отлично работает для одного запроса, но некоторые странные вещи добавлять на буферах, когда много запросов, где посыл.
Любые идеи, что мне здесь не хватает? Благодарю.
В моей первой версии я дублирую загрузчик/обработчик для каждого запрошенного изображения, он отлично работает, но не очень оптимизирован.
Привет, johnstlr, спасибо за этот быстрый полезный ответ. Теперь я использую ChannelPipelineFactory для создания экземпляров обработчиков HTTPCodec dand my Tile. Он отлично работает, но у меня все еще есть java.lang.IllegalStateException: Исполнитель не может быть отключен от потока, полученного от себя. Убедитесь, что вы не вызываете releaseExternalResources() из потока рабочего процесса ввода-вывода. Исключение. У вас есть идея? А для информации причина, по которой я не использовал HttpChunkAggregator, - это установить размер буфера для конструктора HttpChunkAggregator. – qboileau
Вы вызываете bootstrap.releaseExternalResources из CountDownLatch.countDown, который вызывается из потока ввода-вывода в методах обработчика. К сожалению, вы не можете этого сделать. Вам нужно вызвать releaseExternalResources из потока, который не находится в пуле потоков, который используется Netty. Один из вариантов может заключаться в вызове releaseExternalResources в вашем потоке, который читается из вашей внутренней очереди после завершения обработки очереди. Кроме того, вы совершенно правы в HttpChunkAggregator. Сожалею! – johnstlr