2015-01-19 1 views
0

Я использую ниже код для подключения моего пользовательского Java NIO сервер:Необходимости ServerSocketChannel принять 1000 соединения TCP в секунду

public static void main(String[] args) { 
     try { 

String value[] = { "00*********402", "00*********383",.....} 
     int i = 0; 

      while (i < value.length) { 
       RunnableDemo temp = new RunnableDemo(value[i]); 
       temp.start(); 
       i++; 
       try { 
        Thread.sleep(1000); //REDUCING THIS TIME CAUSE PROBLEM 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
      } 

     } catch (Exception e) { 

      e.printStackTrace(); 
     } 
    } 

    class RunnableDemo implements Runnable { 

    private Socket socket; 

    private Thread t; 

    private String threadName;// equals with client number 

    RunnableDemo(int phoneNumber) { 
     threadName = String.valueOf(phoneNumber); 
     System.err.println("Creating " + threadName); 

    } 

    RunnableDemo(String phoneNumber) { 
     threadName = phoneNumber; 
     System.err.println("Creating " + threadName); 

    } 

    public void run() { 
     System.err.println("Running " + threadName); 
     try { 

      //socket = new Socket("94.232.174.97", 4664); 
      socket = new Socket("192.168.20.22", 4664); 
      PrintWriter testWriter = new PrintWriter(new OutputStreamWriter(
        socket.getOutputStream())); 
      testWriter.print(threadName); 
      testWriter.flush(); 

      String incoming_message = ""; 
      BufferedReader bufferedIn = new BufferedReader(
        new InputStreamReader(socket.getInputStream())); 
      while (true) { 
       if (bufferedIn != null) { 
        incoming_message = bufferedIn.readLine(); 
        System.out.println("recived message: " + incoming_message); 
       } 
      } 

     } catch (Exception e) { 
      System.out.println("Thread " + threadName + " interrupted."); 
      e.printStackTrace(); 
     } 
     System.out.println("Thread " + threadName + " exiting."); 
    } 

    public void read() { 

    } 

    public void start() { 
     System.out.println("Starting " + threadName); 
     try { 
      if (t == null) { 
       t = new Thread(this, threadName); 
       t.start(); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
    } 

} 

он работает нормально, когда я создаю клиент тему каждые 1000 миллилитров, но когда я уменьшить время к 100mls (соединение 10 клиентов на сервер в секунду) после нескольких секунд мои клиентские потоки получают ниже ошибки:

java.net.ConnectException: Connection refused: connect 
at java.net.DualStackPlainSocketImpl.connect0(Native Method) 
at java.net.DualStackPlainSocketImpl.socketConnect(Unknown Source) 
at java.net.AbstractPlainSocketImpl.doConnect(Unknown Source) 
at java.net.AbstractPlainSocketImpl.connectToAddress(Unknown Source) 
at java.net.AbstractPlainSocketImpl.connect(Unknown Source) 
at java.net.PlainSocketImpl.connect(Unknown Source) 
at java.net.SocksSocketImpl.connect(Unknown Source) 
at java.net.Socket.connect(Unknown Source) 
at java.net.Socket.connect(Unknown Source) 
at java.net.Socket.<init>(Unknown Source) 
at java.net.Socket.<init>(Unknown Source) 
at RunnableDemo.run(Main.java:419) 
at java.lang.Thread.run(Unknown Source) 

Это серверная часть тоже:

public class EchoServer { 

static final org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(Main.class); 

private static final int BUFFER_SIZE = 1024; 

private final static int DEFAULT_PORT = 4664; 
private InetAddress hostAddress = null; 

private int port; 
private String ipAddress = "192.168.20.22"; 
private Selector selector; 

// The buffer into which we'll read data when it's available 
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE); 

int timestamp = 1; 

HashMap<Integer, String> connectedClients = new HashMap<Integer, String>(); 
HashMap<String, Integer> clientIds= new HashMap<String,Integer>(); 
HashMap<String, String> messageToClients = new HashMap<String, String>(); 


public EchoServer() { 
    this(DEFAULT_PORT); 
} 

public EchoServer(int port) { 
    try{ 
     this.port = port; 
     hostAddress = InetAddress.getByName(ipAddress); 
     selector = initSelector(); 
     loop(); 
    }catch(Exception ex){ 
     logger.error("Exception Accoured:",ex); 
    } 
} 

private Selector initSelector() { 
    try{ 
     Selector socketSelector = SelectorProvider.provider().openSelector(); 
     ServerSocketChannel serverChannel = ServerSocketChannel.open(); 
     serverChannel.configureBlocking(false); 

     InetSocketAddress isa = new InetSocketAddress(hostAddress, port); 
     serverChannel.socket().bind(isa); 
     serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT); 

     return socketSelector; 
    }catch(Exception ex){ 
     logger.error("Exception Accoured:",ex); 
     return null; 
    } 
} 

private void loop() { 
    while (true) { 
     try { 

      // Do defined operations for clients 
      // ------------------------------ 
      selector.select(); 
      Iterator<SelectionKey> selectedKeys = selector.selectedKeys() 
        .iterator(); 

      int c = 0; 
      while (selectedKeys.hasNext()) { 
       SelectionKey key = selectedKeys.next(); 
       selectedKeys.remove(); 

       if (!key.isValid()) { 
        logger.warn(key.hashCode() + "- is invalid"); 
        continue; 
       } 
       // Check what event is available and deal with it 
       if (key.isAcceptable()) { 
        accept(key); 
       } else if (key.isReadable()) { 
        read(key); 
       } else if (key.isWritable()) { 
        write(key); 
       } 
       c++; 

      } 

      logger.info(c + " keys has been iterated"); 

      // Fetch List from server 
      // ----------------------------------------- 
      try { 
       ResultSet resultset = DataBase.getInstance() 
         .getQueryResult(); 


       while (resultset.next()) { 
        String mobileNumber = resultset.getString("MobileNo"); 

        String message = resultset.getInt("IsMessage") + "," 
          + resultset.getInt("IsDeliver") + "," 
          + resultset.getInt("IsGroup") + "," 
          + resultset.getInt("IsSeen"); 
        messageToClients.put(mobileNumber, message); 

       } 



      } catch (Exception ex) { 
       //ex.printStackTrace(); 
       logger.error("Exception Accoured:",ex); 
      } 

      // Wait for 1 second 
      // ----------------------------------------------- 
      Thread.sleep(1000); 
      timestamp++; 

     } catch (Exception e) { 
      e.printStackTrace(); 
      System.exit(1); 
     } 

    } 
} 

private void accept(SelectionKey key) { 

    try{ 
     // Initialize the connection ------------------------------------------ 
     ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key 
       .channel(); 
     SocketChannel socketChannel = serverSocketChannel.accept(); 
     socketChannel.configureBlocking(false); 
     socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true); 
     socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); 
     logger.info("New client accepted"); 

     // Fire read for reading phone number -------------------------------- 
     socketChannel.register(selector, SelectionKey.OP_READ); 
    }catch(Exception ex){ 
     logger.error("Exception Accoured:",ex); 
    } 
} 

private void read(SelectionKey key) { 

    try{ 
     // Initialize Socket ----------------------------------------------------- 
     SocketChannel socketChannel = (SocketChannel) key.channel(); 


     // Reading Client Number ------------------------------------------------- 

     readBuffer.clear(); 

     int numRead; 
     try { 
      numRead = socketChannel.read(readBuffer); 
     } catch (IOException e) { 
      logger.error("Forceful shutdown--->" + key.hashCode()); 
      key.cancel(); 
      return; 
     } 

     // read was not successful 
     if (numRead == -1) { 
      logger.error("Graceful shutdown ---> " + key.hashCode()); 
      key.cancel(); 
      return; 
     } 

     // read was successful and now we can write it to String 
     readBuffer.flip(); 
     byte[] bytes = new byte[readBuffer.limit()]; 
     readBuffer.get(bytes); 

     String number = new String(bytes); 

     number = number.replace("\r\n", ""); 
     number = number.trim(); 

     // Update Connect Clients Status ----------------------------------------- 
     Integer clientId=clientIds.get(number); 
     if (clientId == null) { 
      connectedClients.put(key.hashCode(), number); 
      clientIds.put(number, key.hashCode()); 
      logger.error(number + "- (" + key.hashCode() + ") has Connected"); 
     }else{ 
      connectedClients.remove(clientId); 
      connectedClients.put(key.hashCode(), number); 
      clientIds.put(number, key.hashCode()); 
      logger.error(number + "- (" + key.hashCode() + ") REconnected"); 
     } 

     logger.error("All clients number are:" + connectedClients.size()); 

     // Fire Write Operations ------------------------------------------------- 
     socketChannel.register(selector, SelectionKey.OP_WRITE); 

    }catch(Exception ex){ 
     //ex.printStackTrace(); 
     logger.error("Exception Accoured:",ex); 
    } 
} 

private void write(SelectionKey key) { 
    try { 

     //Check channel still alive ---------------------------------------------- 

     String clientNumber = connectedClients.get(key.hashCode()); 

     if(clientNumber == null){ 
      key.cancel(); 
      logger.info("key with hash=" + key.hashCode() + " canceled"); 
      return; 
     } 

     // Get Channel ----------------------------------------------------------- 
     SocketChannel socketChannel = (SocketChannel) key.channel(); 

     // Send Message if client number have new message ------------------------ 

     if (messageToClients.get(clientNumber) != null) { 
      logger.info(clientNumber + "-" + key.hashCode() 
          + "- Sent write message"); 
      String timeStamp = String.valueOf(timestamp); 
      String message = messageToClients.get(clientNumber); 
      ByteBuffer dummyResponse = ByteBuffer.wrap((message + "\r\n").getBytes("UTF-8")); 
      socketChannel.write(dummyResponse); 
      messageToClients.remove(clientNumber); 
     } 

     // Fire new write state -------------------------------------------------- 
     socketChannel.register(selector, SelectionKey.OP_WRITE); 

    } catch (IOException iox) { 
     logger.error("Exception Accoured:key=" + key.hashCode(),iox); 
     logger.info("$$$key with hash=" + key.hashCode() + " canceled"); 
     key.cancel(); 
    } 
} 

возможно, существует ограничение на прием соединений в секунду на порт ?! Мне нужно, по крайней мере, принимать 1000 tcp-соединений в секунду. может ли кто-нибудь помочь?

UPDATE

Я обновляю количество ожидающих намерений до 1000, используя следующую строку кода:

serverChannel.socket().bind(isa,1000); 

теперь он получает больше клиентов, но еще через несколько секунд я получаю connection refuse ошибку.

+1

Существует ограничение на то, сколько ожидающих соединений может существовать сразу, определяемое сервером, и если ваш сервер медленно принимает соединения, которые будут влиять на максимальную скорость. Вам нужно показать соответствующие части кода сервера. – EJP

+0

Э? * Я * спросил * вы *, чтобы опубликовать свой код сервера. – EJP

+0

Я также отправляю код сервера. –

ответ

0

Вы теряете время в своем цикле выбора, выполняя операции с базой данных, что ограничивает скорость входящих соединений. Не делай этого. Единственной операцией блокировки в цикле выбора должен быть сам выбор.

И вы теряете еще больше времени, выполняя односекундный сон между выборами. Для этого нет никакой причины. Просто избавься от этого.

NB Когда read() возвращает -1, вы должны закрыть канал, а не просто отменить клавишу. В противном случае вы пропускаете каналы.

 Смежные вопросы

  • Нет связанных вопросов^_^