2012-03-22 1 views
5

во-первых, позвольте мне объяснить контекст:Отправить несколько asynchonous запросов на клиенте Нетти

Я должен создать клиент, который будет отправлять много HTTP-запросов для загрузки изображений. Эти запросы должны быть асинхронными, поскольку, как только изображение будет завершено, оно будет добавлено в очередь, а затем распечатает на экране. Поскольку изображения могут быть большими, а ответы распределены, мой обработчик должен объединить его в буфер.

Итак, я следую кодам примеров Netty (HTTP spoon example).

В настоящее время у меня есть три статических карты для хранения для каждого канала идентификатора канала и буфера/куба boolean/моего конечного объекта.

private static final ConcurrentHashMap<Integer, ChannelBuffer> BUFFER_MAP = new ConcurrentHashMap<Integer, ChannelBuffer>(); 
private static final ConcurrentHashMap<Integer, ImagePack> PACK_MAP = new ConcurrentHashMap<Integer, ImagePack>(); 
private static final ConcurrentHashMap<Integer, Boolean> CHUNKS_MAP = new ConcurrentHashMap<Integer, Boolean>(); 

После этого я создаю свой загрузочный клиент и счетчик countDown количество ожидающих запросов. Окончательная очередь и счетчик передаются моему обработчику, когда изображение ответа завершено.

final ClientBootstrap bootstrap = new ClientBootstrap(
      new NioClientSocketChannelFactory(
      Executors.newCachedThreadPool(), 
      Executors.newCachedThreadPool())); 
    bootstrap.setOption("keepAlive", true); 
    bootstrap.setOption("tcpNoDelay", true); 
    bootstrap.setOption("reuseAddress", true); 
    bootstrap.setOption("connectTimeoutMillis", 30000); 


    final CountDownLatch latch = new CountDownLatch(downloadList.size()) { 

     @Override 
     public void countDown() { 
      super.countDown(); 
      if (getCount() <= 0) { 
       try { 
        queue.put(END_OF_QUEUE); 
        bootstrap.releaseExternalResources(); 
       } catch (InterruptedException ex) { 
        LOGGER.log(Level.WARNING, ex.getMessage(), ex); 
       } 
      } 
     } 
    }; 
    bootstrap.getPipeline().addLast("codec", new HttpClientCodec()); 
    bootstrap.getPipeline().addLast("handler", new TileClientHandler(queue, latch)); 

После этого я создаю канал для каждого загружаемого изображения и когда канал подключен, запрос будет создан и отправлен. Хост и порт уже были извлечены раньше.

for (final ImagePack pack : downloadList) { 

     final ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); 

     future.addListener(new ChannelFutureListener() { 

      public void operationComplete(ChannelFuture cf) throws Exception { 

       final Channel channel = future.getChannel(); 

       PACK_MAP.put(channel.getId(), pack); 

       final HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, pack.url); 
       request.setHeader(HttpHeaders.Names.HOST, host); 
       request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE); 
       request.setHeader(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.BYTES); 

       if (channel.isWritable()) { 
        channel.write(request); 
       } 
      } 
     }); 
    } 

Теперь это мой ChannelHandler, который является внутренним классом, который распространяется SimpleChannelUpstreamHandler. Когда канал подключен, создается новая запись в BUFFER_MAP и в CHUNKS_MAP. BUFFER_MAP содержит все буферы изображений, используемые обработчиком для объединения фрагментов изображения из каналов, а CHUNKS_MAP содержит ответ chunked boolean. По завершении ответа в очередь добавляется изображение InputSteam, задержка защелки и канал закрыты.

private class TileClientHandler extends SimpleChannelUpstreamHandler { 

    private CancellableQueue<Object> queue; 
    private CountDownLatch latch; 

    public TileClientHandler(final CancellableQueue<Object> queue, final CountDownLatch latch) { 
     this.queue = queue; 
     this.latch = latch; 
    } 

    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
     if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ 
      BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); 
     } 
     if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ 
      CHUNKS_MAP.put(ctx.getChannel().getId(), false); 
     } 
    } 

    @Override 
    public void writeComplete(ChannelHandlerContext ctx, WriteCompletionEvent e) throws Exception { 
     super.writeComplete(ctx, e); 
     if(!BUFFER_MAP.contains(ctx.getChannel().getId())){ 
      BUFFER_MAP.put(ctx.getChannel().getId(), new DynamicChannelBuffer(50000)); 
     } 
     if(!CHUNKS_MAP.contains(ctx.getChannel().getId())){ 
      CHUNKS_MAP.put(ctx.getChannel().getId(), false); 
     } 
    } 

    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { 
     final Integer channelID = ctx.getChannel().getId(); 
     if (!CHUNKS_MAP.get(channelID)) { 
      final HttpResponse response = (HttpResponse) e.getMessage(); 

      if (response.isChunked()) { 
       CHUNKS_MAP.put(channelID, true); 

      } else { 
       final ChannelBuffer content = response.getContent(); 
       if (content.readable()) { 
        final ChannelBuffer buf = BUFFER_MAP.get(channelID); 
        buf.writeBytes(content); 
        BUFFER_MAP.put(channelID, buf); 
        messageCompleted(e); 

       } 
      } 
     } else { 
      final HttpChunk chunk = (HttpChunk) e.getMessage(); 
      if (chunk.isLast()) { 
       CHUNKS_MAP.put(channelID, false); 
       messageCompleted(e); 
      } else { 
       final ChannelBuffer buf = BUFFER_MAP.get(channelID); 
       buf.writeBytes(chunk.getContent()); 
       BUFFER_MAP.put(channelID, buf); 
      } 
     } 
    } 

    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     e.getCause().printStackTrace(); 
     latch.countDown(); 
     e.getChannel().close(); 
    } 

    private void messageCompleted(MessageEvent e) { 
     final Integer channelID = e.getChannel().getId(); 
     if (queue.isCancelled()) { 
      return; 
     } 

     try { 
      final ImagePack p = PACK_MAP.get(channelID); 
      final ChannelBuffer b = BUFFER_MAP.get(channelID); 

      p.setBuffer(new ByteArrayInputStream(b.array())); 
      queue.put(p.getTile()); 
     } catch (Exception ex) { 
      LOGGER.log(Level.WARNING, ex.getMessage(), ex); 
     } 
     latch.countDown(); 
     e.getChannel().close(); 
    } 
} 

Моя проблема, когда я выполняю этот код, у меня есть эти исключения:

java.lang.IllegalArgumentException: invalid version format: 3!}@ 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108) 
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) 
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

java.lang.IllegalArgumentException: invalid version format: 
    at org.jboss.netty.handler.codec.http.HttpVersion.<init>(HttpVersion.java:108) 
    at org.jboss.netty.handler.codec.http.HttpVersion.valueOf(HttpVersion.java:68) 
    at org.jboss.netty.handler.codec.http.HttpResponseDecoder.createMessage(HttpResponseDecoder.java:110) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:198) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.cleanup(ReplayingDecoder.java:546) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.channelDisconnected(ReplayingDecoder.java:449) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) 
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) 
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) 
    at org.jboss.netty.channel.Channels.close(Channels.java:720) 
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

22 mars 2012 15:27:31 org.jboss.netty.channel.DefaultChannelPipeline 
ATTENTION: An exception was thrown by a user handler while handling an exception event ([id: 0x3cd16610, /172.16.30.91:34315 :> tile.openstreetmap.org/193.63.75.98:80] EXCEPTION: java.lang.IllegalArgumentException: invalid version format: 
java.lang.IllegalStateException: An Executor cannot be shut down from the thread acquired from itself. Please make sure you are not calling releaseExternalResources() from an I/O worker thread. 
    at org.jboss.netty.util.internal.ExecutorUtil.terminate(ExecutorUtil.java:71) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.releaseExternalResources(NioClientSocketChannelFactory.java:171) 
    at org.jboss.netty.bootstrap.Bootstrap.releaseExternalResources(Bootstrap.java:324) 
    at org.geotoolkit.client.map.CachedPyramidSet$1.countDown(CachedPyramidSet.java:314) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:514) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireChannelDisconnected(Channels.java:360) 
    at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:595) 
    at org.jboss.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:101) 
    at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:60) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleDownstream(HttpClientCodec.java:82) 
    at org.jboss.netty.channel.Channels.close(Channels.java:720) 
    at org.jboss.netty.channel.AbstractChannel.close(AbstractChannel.java:200) 
    at org.geotoolkit.client.map.CachedPyramidSet$TileClientHandler.exceptionCaught(CachedPyramidSet.java:515) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.exceptionCaught(ReplayingDecoder.java:461) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireExceptionCaught(Channels.java:432) 
    at org.jboss.netty.channel.AbstractChannelSink.exceptionCaught(AbstractChannelSink.java:52) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) 
    at java.lang.Thread.run(Thread.java:662) 

А также некоторые NPE появляется несколько раз.

java.lang.NullPointerException 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.skipControlCharacters(HttpMessageDecoder.java:409) 
    at org.jboss.netty.handler.codec.http.HttpMessageDecoder.decode(HttpMessageDecoder.java:184) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:113) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:101) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:470) 
    at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:443) 
    at org.jboss.netty.handler.codec.http.HttpClientCodec.handleUpstream(HttpClientCodec.java:77) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:274) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:261) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:351) 
    at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:282) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:202) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) 

Всего этот код отлично работает для одного запроса, но некоторые странные вещи добавлять на буферах, когда много запросов, где посыл.

Любые идеи, что мне здесь не хватает? Благодарю.

В моей первой версии я дублирую загрузчик/обработчик для каждого запрошенного изображения, он отлично работает, но не очень оптимизирован.

ответ

5

Проблема в том, что вы используете один HttpClientCodec между всеми вашими каналами. Консоль по умолчанию, указанная в бутстрапе, клонируется для всех каналов, поэтому каждый канал видит один и тот же экземпляр каждого обработчика. Http-кодеки являются stateful, поэтому вы видите, что эффекты разных ответов смешиваются.

Простейшим решением является передача ChannelPipelineFactory в бутстрап. Это будет вызываться для каждого нового канала, и вы можете создать конвейер с новыми экземплярами HttpClientCodec. Нет ничего, что помешало бы вам использовать один и тот же экземпляр TileClientHandler для каждого создаваемого вами конвейера, если это так, как он должен работать.

Мне все же интересно.Учитывая, что вы делаете каждый запрос одновременно, было бы проще просто добавить HttpChunkAggregator выше HttpClientCodec и позволить Netty агрегировать все куски в один HttpResponse. Тогда вы просто возьмете собранный контент оттуда?

+0

Привет, johnstlr, спасибо за этот быстрый полезный ответ. Теперь я использую ChannelPipelineFactory для создания экземпляров обработчиков HTTPCodec dand my Tile. Он отлично работает, но у меня все еще есть java.lang.IllegalStateException: Исполнитель не может быть отключен от потока, полученного от себя. Убедитесь, что вы не вызываете releaseExternalResources() из потока рабочего процесса ввода-вывода. Исключение. У вас есть идея? А для информации причина, по которой я не использовал HttpChunkAggregator, - это установить размер буфера для конструктора HttpChunkAggregator. – qboileau

+1

Вы вызываете bootstrap.releaseExternalResources из CountDownLatch.countDown, который вызывается из потока ввода-вывода в методах обработчика. К сожалению, вы не можете этого сделать. Вам нужно вызвать releaseExternalResources из потока, который не находится в пуле потоков, который используется Netty. Один из вариантов может заключаться в вызове releaseExternalResources в вашем потоке, который читается из вашей внутренней очереди после завершения обработки очереди. Кроме того, вы совершенно правы в HttpChunkAggregator. Сожалею! – johnstlr