2016-12-07 4 views
1

Я слежу за учебным пособием akka java websocket в попытке создать сервер websocket. Я хочу, чтобы реализовать 2 дополнительные функции:akka websocket с java, подсчет количества клиентов, отправка сообщения клиенту

  • Будучи в состоянии отобразить количество подключенных клиентов, но результат всегда 0 или 1, даже если я знаю, что у меня есть 100 одновременно подключенных клиентов .
  • Связь через Интернет является двунаправленной. В настоящее время сервер отвечает только сообщением, когда клиент отправляет сообщение. Как начать отправку сообщения с сервера на клиент?

Вот оригинальный Akka ява пример сервера код с минимальной модификацией моего клиента подсчета реализации:

public class websocketServer { 
private static AtomicInteger connections = new AtomicInteger(0);//connected clients count. 

public static class MyTimerTask extends TimerTask { 
//called every second to display number of connected clients. 
    @Override 
    public void run() { 
     System.out.println("Conncurrent connections: " + connections); 
    } 
} 


//#websocket-handling 
    public static HttpResponse handleRequest(HttpRequest request) { 
     HttpResponse result; 
     connections.incrementAndGet(); 
     if (request.getUri().path().equals("/greeter")) { 
      final Flow<Message, Message, NotUsed> greeterFlow = greeter(); 
      result = WebSocket.handleWebSocketRequestWith(request, greeterFlow); 
     } else { 
      result = HttpResponse.create().withStatus(413); 
     } 
     connections.decrementAndGet(); 
     return result; 
    } 


public static void main(String[] args) throws Exception { 
    ActorSystem system = ActorSystem.create(); 
    TimerTask timerTask = new MyTimerTask(); 
    Timer timer = new Timer(true); 
    timer.scheduleAtFixedRate(timerTask, 0, 1000); 
    try { 
     final Materializer materializer = ActorMaterializer.create(system); 

     final Function<HttpRequest, HttpResponse> handler = request -> handleRequest(request); 
     CompletionStage<ServerBinding> serverBindingFuture = 
     Http.get(system).bindAndHandleSync(
      handler, ConnectHttp.toHost("****", 1183), materializer); 


     // will throw if binding fails 
     serverBindingFuture.toCompletableFuture().get(1, TimeUnit.SECONDS); 
     System.out.println("Press ENTER to stop."); 
     new BufferedReader(new InputStreamReader(System.in)).readLine(); 
     timer.cancel(); 
    } catch (Exception e){ 
     e.printStackTrace(); 
    } 
    finally { 
     system.terminate(); 
    } 
    } 

    //#websocket-handler 

    /** 
    * A handler that treats incoming messages as a name, 
    * and responds with a greeting to that name 
    */ 
    public static Flow<Message, Message, NotUsed> greeter() { 
    return 
     Flow.<Message>create() 
     .collect(new JavaPartialFunction<Message, Message>() { 
       @Override 
       public Message apply(Message msg, boolean isCheck) throws Exception { 
       if (isCheck) { 
        if (msg.isText()) { 
         return null; 
        } else { 
         throw noMatch(); 
        } 
       } else { 
        return handleTextMessage(msg.asTextMessage()); 
       } 
       } 
     }); 
    } 

    public static TextMessage handleTextMessage(TextMessage msg) { 
    if (msg.isStrict()) // optimization that directly creates a simple response... 
    { 
     return TextMessage.create("Hello " + msg.getStrictText()); 
    } else // ... this would suffice to handle all text messages in a streaming fashion 
    { 
     return TextMessage.create(Source.single("Hello ").concat(msg.getStreamedText())); 
    } 
    } 
    //#websocket-handler 
} 

ответ

0

адресации 2 пулевых пунктов ниже:

1 - вам необходимо прикрепить свои показатели к потоку сообщений - а не к потоку HttpRequest - для эффективного подсчета активных подключений. Вы можете сделать это, используя watchTermination. Пример кода для метода handleRequest ниже

public static HttpResponse handleRequest(HttpRequest request) { 
    HttpResponse result; 
    if (request.getUri().path().equals("/greeter")) { 
    final Flow<Message, Message, NotUsed> greeterFlow = greeter().watchTermination((nu, cd) -> { 
     connections.incrementAndGet(); 
     cd.whenComplete((done, throwable) -> connections.decrementAndGet()); 
     return nu; 
    }); 
    result = WebSocket.handleWebSocketRequestWith(request, greeterFlow); 
    } else { 
    result = HttpResponse.create().withStatus(413); 
    } 
    return result; 
} 

2 - для сервера самостоятельно отправлять сообщения, которые вы можете создать его поток сообщений с помощью Flow.fromSinkAndSource. Пример ниже (это отправит только одно сообщение):

public static Flow<Message, Message, NotUsed> greeter() { 
    return Flow.fromSinkAndSource(Sink.ignore(), 
    Source.single(new akka.http.scaladsl.model.ws.TextMessage.Strict("Hello!")) 
); 
} 
+0

Благодарим за отзыв watchTermination(), клиентский счетчик работает правильно. Однако для сервера, отправляющего сообщение независимо, он все еще полагается на то, что клиент отправляет какое-то сообщение сначала, прежде чем отвечать (связь происходит один раз). Нет ли способа сохранить запись текущего клиента, подключенного к сети, и отправить сообщение, когда у сервера есть какая-то новая информация для доставки клиенту? что-то вроде строк clients.getClient («3353»). send (new akka.http.scaladsl.model.ws.TextMessage.Strict («Hello!»); – Mike

0

В handleRequest методе вы инкремента и затем уменьшаем счетчик connections, так что в конце концов значение всегда 0.

public static HttpResponse handleRequest(HttpRequest request) { 
     ... 
     connections.incrementAndGet(); 
     ... 
     connections.decrementAndGet(); 
     return result; 
    }