2013-03-27 1 views
2

Я написал следующие две строкиJava Threading в TCP и серверных сокетов

ServerSocket mobCom = new ServerSocket(9846); 
Socket server = mobCom.accept(); 

хочу новое соединение TCP должен быть создан, и что соединение будет обрабатываться новой нитью. Например, приведенный выше код создает серверный сокет. И есть несколько клиентов. Всякий раз, когда клиент подключается к серверу, может создаваться новый поток, который будет обслуживать запросы, поступающие от этого конкретного клиента. Как мне реализовать то же самое.

EDIT

Я также хочу, чтобы ограничить пул потоков для 10 пользователей. И если появляется больше пользователей, я хочу отправить им сообщение об ошибке, не обрабатывая от них дополнительные запросы.

+1

Есть около миллиона примеров этого в сети ... –

+0

См [этот ответ] (http://stackoverflow.com/a/15350754/597657). –

ответ

-1

Вам нужно будет сделать что-то вроде следующего. ServiceThread - это поток запросов на обслуживание.

while (true) { 
       try { 
        Socket clientSocket = null; 
        if (null != serverSocket) { 
        clientSocket = serverSocket.accept(); 
        ServiceThread serverThread = new ServiceThread(clientSocket); // Create a new thread for each client 
        serverThread.start(); 
        } 
       } catch(Exception ex) { 
        System.out.println("Exception while accepting connection " + ex.getMessage()); 
        ex.printStackTrace(); 
       } 
+0

Этот код курит CPU, если 'serverSocket' равно null. – EJP

0

Вы можете достичь желаемых результатов, используя java util concurrent. Создание фиксированного числа рабочих. Используйте вызов, чтобы инициировать чтение блока на SynchronousQueue. Поэтому, если все работники заняты одной работой каждый и будут заняты обработкой их (связь с сокетом), из SynchronousQueue не будет никаких чтений, и, следовательно, предложение для синхронной очереди не удастся. Проверка этого сбоя (что означает, что все работники с фиксированным числом заняты, теперь нет привязки к очереди), отклоните следующий запрос (ы).

Пример кода в следующих строках [Untested - исключенные исключения для краткости, пожалуйста, внесите необходимые изменения].

public class BoundedServer 
{ 
    public static void main(String[] args) 
    { 
     /** 
     * Port to serve 
     */ 
     final int port = 2013; 

     /** 
     * Max Workers 
     */ 
     final int maxworkers = 10; 

     /** 
     * The server socket. 
     */ 
     ServerSocket mServerSocket = null; 

     /** 
     * Queue of work units to process if there is a worker available. 
     */ 
     final SynchronousQueue<WorkUnit> mQueueToProcess = new SynchronousQueue<WorkUnit>(); 

     /** 
     * Queue of work units to reject if there is no current worker available. 
     */ 
     final LinkedBlockingQueue<WorkUnit> mQueueToReject = new LinkedBlockingQueue<WorkUnit>(); 

     /** 
     * A thread pool to handle the work. 
     */ 
     final ExecutorService communicationservice = Executors.newFixedThreadPool(maxworkers); 

     /** 
     * Let a single thread take care of rejecting the requests when needed to do so. 
     */ 
     final ExecutorService rejectionservice = Executors.newSingleThreadExecutor(); 

     try 
     { 
      Runnable communicationlauncher = new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         /** 
         * Set of workers to handle the work. 
         */ 
         final CommunicationWorker[] workers = new CommunicationWorker[maxworkers]; 

         communicationservice.invokeAll(Arrays.asList(workers)); 
        } 
        finally 
        { 
         communicationservice.shutdown(); 
        } 
       } 
      }; 

      new Thread(communicationlauncher).start(); 

      Runnable rejectionlauncher = new Runnable() 
      { 
       public void run() 
       { 
        try 
        { 
         RejectionWorker rejectionworker = new RejectionWorker(mQueueToReject); 

         rejectionservice.submit(rejectionworker); 
        } 
        finally 
        { 
         rejectionservice.shutdown(); 
        } 
       } 
      }; 
      new Thread(rejectionlauncher).start(); 

      mServerSocket = new ServerSocket(port); 

      while(true) 
      { 
       WorkUnit work = new WorkUnit(mServerSocket.accept()); 

       if(!mQueueToProcess.offer(work)) 
       { 
        mQueueToReject.add(work); 
       } 
      } 
     } 
     finally 
     { 
      try 
      { 
       mServerSocket.close(); 
      } 
     } 
    } 
} 


public class WorkUnit 
{ 
    private Socket mSocket = null; 

    public WorkUnit(Socket socket) 
    { 
     super(); 
     this.setSocket(socket); 
    } 

    public Socket getSocket() { 
     return mSocket; 
    } 

    public void setSocket(Socket mSocket) { 
     this.mSocket = mSocket; 
    } 
} 

public class CommunicationWorker 
implements Callable<Boolean> 
{ 
    private SynchronousQueue<WorkUnit> mQueueToProcess; 

    public CommunicationWorker(SynchronousQueue<WorkUnit> queueToProcess) 
    { 
     super(); 
     this.mQueueToProcess = queueToProcess; 
    } 

    @Override 
    public Boolean call() throws Exception 
    { 
     while(true) 
     { 
      WorkUnit work = mQueueToProcess.take(); 

      Socket socket = work.getSocket(); 

      // Code to handle socket communication and closure. 
      // Once the communication is finished, this thread will get blocked to mQueueToProcess. 
     } 
    } 
} 


public class RejectionWorker 
implements Callable<Boolean> 
{ 
    private LinkedBlockingQueue<WorkUnit> mQueueToReject; 

    public RejectionWorker(LinkedBlockingQueue<WorkUnit> queueToReject) 
    { 
     super(); 
     this.mQueueToReject = queueToReject; 
    } 

    @Override 
    public Boolean call() throws Exception 
    { 
     while(true) 
     { 
      WorkUnit work = mQueueToReject.take(); 

      Socket socket = work.getSocket(); 

      // Code to reject the request. 
     } 
    } 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^