2012-01-24 3 views
3

Я написал сервер, который принимает сообщения о подключении и бомбардировках (~ 100 байт) с использованием текстового протокола, и моя реализация может отправлять сообщения о шлейфе 400K/sec с клиентским интерфейсом 3rt. Я выбрал Netty для этой задачи, SUSE 11 RealTime, JRockit RTS. Но когда я начал разрабатывать собственный клиент на основе Netty, я столкнулся с резким сокращением пропускной способности (от 400 К до 1,3 КБ/сек). Код клиента довольно прост. Не могли бы вы дать совет или показать примеры, как писать гораздо более эффективный клиент. Я, на самом деле, больше заботился о латентности, но начал с тестов пропускной способности, и я не думаю, что нормально иметь 1.5Kmsg/sec на loopback. P.S. клиентская цель - получать сообщения только от сервера и очень редко посылать сердечные сокращения.Проблемы с нагрузкой на загрузку Java Netty

Client.java 

public class Client { 

private static ClientBootstrap bootstrap; 
private static Channel connector; 
public static boolean start() 
{ 
    ChannelFactory factory = 
     new NioClientSocketChannelFactory(
       Executors.newCachedThreadPool(), 
       Executors.newCachedThreadPool()); 
    ExecutionHandler executionHandler = new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); 

    bootstrap = new ClientBootstrap(factory); 

    bootstrap.setPipelineFactory(new ClientPipelineFactory()); 

    bootstrap.setOption("tcpNoDelay", true); 
    bootstrap.setOption("keepAlive", true); 
    bootstrap.setOption("receiveBufferSize", 1048576); 
    ChannelFuture future = bootstrap 
      .connect(new InetSocketAddress("localhost", 9013)); 
    if (!future.awaitUninterruptibly().isSuccess()) { 
     System.out.println("--- CLIENT - Failed to connect to server at " + 
          "localhost:9013."); 
     bootstrap.releaseExternalResources(); 
     return false; 
    } 

    connector = future.getChannel(); 

    return connector.isConnected(); 
} 
public static void main(String[] args) 
{ 
    boolean started = start(); 
    if (started) 
     System.out.println("Client connected to the server"); 
} 

} 

ClientPipelineFactory.java 

public class ClientPipelineFactory implements ChannelPipelineFactory{ 

private final ExecutionHandler executionHandler; 
public ClientPipelineFactory(ExecutionHandler executionHandle) 
{ 
    this.executionHandler = executionHandle; 
} 
@Override 
public ChannelPipeline getPipeline() throws Exception { 
    ChannelPipeline pipeline = pipeline(); 
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(
       1024, Delimiters.lineDelimiter())); 
    pipeline.addLast("executor", executionHandler); 
    pipeline.addLast("handler", new MessageHandler()); 

    return pipeline; 
} 

} 

MessageHandler.java 
public class MessageHandler extends SimpleChannelHandler{ 

long max_msg = 10000; 
long cur_msg = 0; 
long startTime = System.nanoTime(); 
@Override 
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { 
    cur_msg++; 

    if (cur_msg == max_msg) 
    { 
     System.out.println("Throughput (msg/sec) : " + max_msg* NANOS_IN_SEC/( System.nanoTime() - startTime) ); 
     cur_msg = 0; 
     startTime = System.nanoTime(); 
    } 
} 

@Override 
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { 
    e.getCause().printStackTrace(); 
    e.getChannel().close(); 
} 

} 

Обновление. На стороне сервера существует периодический поток, который записывается на принятый клиентский канал. И канал скоро станет неприемлемым. Обновление N2. Добавлен OrderedMemoryAwareExecutor в конвейере, но все еще есть очень низкая пропускная способность (около 4k msg/sec)

Исправлено. Я поставил исполнитель перед целым стеком трубопроводов, и он сработал!

+0

Я бы отправил timestamp в сообщениях и получил латентность каждого сообщения. Это может дать вам более подробную информацию о том, что такое задержка. Если вы только обмениваетесь данными на одном и том же хосте, и для вас важна латентность, вы можете вместо этого использовать разделяемую память. –

+0

На самом деле сервер будет удален. Я реализовал эмулятор, чтобы проверить, сколько сообщений в секунду может обрабатываться клиентом. И получается, что наивная реализация клиента нетто медленна. –

+0

Ваш код говорит ItchClientPipelineFactory, но тогда вставленный код предназначен для ClientPipelineFactory. Является ли это просто ошибкой именования, или это тот случай, когда в ItchClientPipelineFactory есть не оптимальный код (т. Е. Не используется обработчик правильного сообщения), и вы забыли, что все еще используете его? – user467257

ответ

3

Если сервер отправляет сообщения с фиксированным размером (~ 100 байт), вы можете установить ReceiveBufferSizePredictor клиентского загрузчик, это позволит оптимизировать для чтения

bootstrap.setOption("receiveBufferSizePredictorFactory", 
      new AdaptiveReceiveBufferSizePredictorFactory(MIN_PACKET_SIZE, INITIAL_PACKET_SIZE, MAX_PACKET_SIZE)); 

Согласно сегменту коды вы публикуемый : Нить рабочего потока клиента делает все в конвейере, поэтому он будет занят декодированием и выполнением обработчиков сообщений. Вы должны добавить обработчик выполнения.

Вы сказали, что канал становится невоспроизводимым со стороны сервера, поэтому вам может потребоваться настроить размеры водяного знака в бутстрапе сервера. вы можете периодически контролировать размер буфера записи (размер очереди записи) и следить за тем, что канал становится невоспроизводимым из-за того, что сообщения не могут быть записаны в сеть. Это можно сделать, используя класс util, как показано ниже.

package org.jboss.netty.channel.socket.nio; 

import org.jboss.netty.channel.Channel; 

public final class NioChannelUtil { 
    public static long getWriteTaskQueueCount(Channel channel) { 
    NioSocketChannel nioChannel = (NioSocketChannel) channel; 
    return nioChannel.writeBufferSize.get(); 
    } 
} 
+0

Jestan, спасибо за ответ. Я добавил ExecutionHandler в конвейер перед обработчиком бизнес-логики (где я измеряю пропускную способность), но это не сработало. Тем не менее, throuhput очень низок, а серверный канал недоступен для записи. Я попытался найти класс NioSocketChannel, но не смог. –

+0

@EgorLakomkin, поэтому ваша проблема еще не решена? Отсутствует NioSocketChannel или отсутствует пакет 'org.jboss.netty.channel.socket.nio'? что такое версия Netty? –

+0

Netty 4.x, похоже, не разрешает доступ к объекту writeBufferSize, и, похоже, пакет также изменился. – Scot