2012-04-12 5 views
0

Final Редактировать/ЗаключениеНетти messageReceived() не будет вызвана после того, как некоторое время с долгоживущими соединениями

Это была проблема не связана с Нетти, по-прежнему очень трудно отлаживать. Рабочие потоки в messageReceived иногда блокировались, поэтому через некоторое время в пуле не было потоков.

Оригинальная проблема

В моей компании мы используем NETTY для прослушивания соединения форм устройства слежения GPS. Трекеры общаются по GPRS.

Мы испытали очень странное поведение netty 3.2.4-final.

Через некоторое время (я не могу точно сказать, сколько, но близко к одному дню) мы не получаем никаких сообщений от трекеров. Это означает, что метод messageReceived нашей реализации SimpleCahnnelUpstreamHandler не будет вызываться! Однако, если я захвачу все пакеты с помощью tcpdump, я могу видеть все входящие сообщения!

Это известная проблема, которая уже исправлена ​​в более поздней версии нетти?

Наш трубопровод канал выглядит следующим образом:

... 
final TcpListenerChannelHandler tcpChannelHandler; 


@Inject 
public TcpListenerPipeline(TcpListenerChannelHandler tcpChannelHandler) { 
    this.tcpChannelHandler = tcpChannelHandler; 
} 

@Override 
public ChannelPipeline getPipeline() throws Exception { 
     ChannelPipeline p = Channels.pipeline(); 
     p.addLast("frameDecoder", new DelimiterBasedFrameDecoder(2048, Delimiters.lineDelimiter())); 
     p.addLast("encoder", new ByteArrayWrapperEncoder()); 
     p.addLast("handler", tcpChannelHandler); 
     return p; 
} 
... 

Мы Instantiate прослушивание следующим образом:

public void startListen() { 
     ChannelFactory channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),20); 
     bootstrap = new ServerBootstrap(channelFactory); 
     bootstrap.setPipelineFactory(pipeline); 
     bootstrap.setOption("child.tcpNoDelay", true); 
     bootstrap.setOption("child.keepAlive", true); 
     lazyLogger.getLogger().info("Binding Tcp listener to 0.0.0.0 on port '{}'", listenPort); 
     serverChannel = bootstrap.bind(new InetSocketAddress("0.0.0.0", listenPort)); 
} 

ли кто-нибудь есть ключ, что может быть не так? Или мы просто вручную отключим весь канал, скажем, час или около того?

EDIT:

У меня есть еще некоторая информация о проблеме

При отсутствии сообщений обрабатываются, это происходит также, что channelConnected не вызывается при успешном удаленном подключении. Я отлажена проблему удаленно и обнаружил, что:

  • на NioServerSocketPipelineSink.java линии # 246 registerAcceptedChannel (acceptedSocket, currentThread); происходит
  • Выполнение программного обеспечения доходит до DefaultChannelPipeline line # 781 с различными событиями, но мой TcpListenerChannelHandler никогда не является контекстом.

Самое странное, что иногда netty замечает, что канал подключен, а иногда и нет.

EDIT2:

TcpListenerCahnnelHandler простая реализация SimpleChannelUpstreamHandler

Основные из него:

public class TcpListenerChannelHandler extends SimpleChannelUpstreamHandler { 
... 
@Override 
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
    super.channelConnected(ctx, e); 
    _logger.info("{} device connected from: {}", deviceProtocol.getName(), ctx.getChannel().getRemoteAddress()); 
    deviceConnectionRegistry.channelConnected(ctx.getChannel()); 
} 

@Override 
public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { 
    super.channelDisconnected(ctx, e); 
    _logger.info("{} device from endpoint '{}' disconnected.", deviceProtocol.getName(), ctx.getChannel().getRemoteAddress()); 
    deviceConnectionRegistry.channelDisconnected(ctx.getChannel()); 
} 

@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent messageEvent) throws Exception { 
    super.messageReceived(ctx, messageEvent); 

    ... 
    NOTE: here we process the meassage, I do not think it can cause any problem 
} 


@Override 
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { 
     if(_logger.isWarnEnabled()) 
      _logger.warn(deviceProtocol.getName()+ " device" 
        +e.getChannel().getRemoteAddress()+" channel", e.getCause()); 

     if (!(e.getCause() instanceof ConnectException)) 
      e.getChannel().close(); 
} 

В то же время я повышен до 3.3.1-финал. И если проблема повторится, у меня есть идея, где продолжить отладку.

EDIT 3:

Я повышен до 3.3.1 окончательных, и через два дня та же проблема повторялась.

Я не знаю, связано ли это, но у нас есть больше IP-адресов на одном физическом интерфейсе. Должны ли мы прослушивать только один интерфейс? Есть ли какие-либо известные проблемы с большим количеством интерфейсов?

Но опять же: tcpdump распознает сообщение для трекеров, но netty не вызывает messageReceived в моем пользовательском обработчике.

EDIT 4:

Я отлажена код дальше. Проблема возникает в NioWorker.java В строке 131 (boolean предлагается = registerTaskQueue.offer (registerTask);) работает нормально, но тогда задача никогда не будет обработана. Это означает, что RegisterTask.run() в строке 748 никогда не будет вызван.

+0

Не могли бы вы включить свои пользовательские обработчики? –

+0

Я включил соответствующую часть TcpListenerChannelHandler, которая является единственным пользовательским обработчиком, помогает ли это? – Szobi

ответ

1

Не знаю, вы пытались добавить LoggingHandler, чтобы посмотреть все? я использую, чтобы использовать пользовательский обработчик:

/** 
* 
* Adapted from the original LoggingHandler in Netty. 
*/ 
public class LoggingHandler implements ChannelUpstreamHandler, ChannelDownstreamHandler { 

    String name; 
    boolean hexDump; 

    public LoggingHandler(String name, boolean hexDump) { 
     this.name = name; 
     this.hexDump = hexDump; 
    } 

    /** 
    * Logs the specified event to the {@link InternalLogger} returned by 
    * {@link #getLogger()}. If hex dump has been enabled for this handler, 
    * the hex dump of the {@link ChannelBuffer} in a {@link MessageEvent} will 
    * be logged together. 
    */ 
    public void log(ChannelEvent e) { 

     String msg = name + " >> " + e.toString(); 

     // Append hex dump if necessary. 
     if (hexDump && e instanceof MessageEvent) { 
      MessageEvent me = (MessageEvent) e; 
      if (me.getMessage() instanceof ChannelBuffer) { 
       ChannelBuffer buf = (ChannelBuffer) me.getMessage(); 
       msg = msg + " - (HEXDUMP: " + ChannelBuffers.hexDump(buf) + ')'; 
      } 
     } 

     // Log the message (and exception if available.) 
     if (e instanceof ExceptionEvent) { 
      Logger.debug(this, msg, ((ExceptionEvent) e).getCause()); 
     } else { 
      Logger.debug(this, msg); 
     } 

    } 



    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) 
      throws Exception { 
     log(e); 
     ctx.sendUpstream(e); 
    } 

    public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) 
      throws Exception { 
     log(e); 
     ctx.sendDownstream(e); 
    } 

которым вставляется как в клиентской и серверной стороне. На стороне сервера я использую его как для дочернего, так и для родительского:

ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), 
       Executors.newCachedThreadPool()); 
     ServerBootstrap bootstrap = new ServerBootstrap(factory); 
     bootstrap.setOption("child.tcpNoDelay", true); 
     bootstrap.setOption("child.keepAlive", true); 

     bootstrap.setPipelineFactory(new ChannelPipelineFactory() { 

      public ChannelPipeline getPipeline() throws Exception { 
       ChannelPipeline pipeline = Channels.pipeline(); 
       pipeline.addLast("LOGGER", new LoggingHandler("SERVER", true)); 
       pipeline.addLast("LAUNCHER", handler.new OnChannelConnectedPlugger()); 
       return pipeline; 
      } 
     }); 
     bootstrap.setParentHandler(new LoggingHandler("SERVER-PARENT", true)); 
+0

Спасибо за tipp, я попробую добавить некоторые регистрации! – Szobi