2016-12-26 7 views
2

Я пытаюсь создать простой tcp-сервер, используя потоки Akka.Разрешить одно соединение с потоком Akka как tcp-сервер

Tcp() 
    .bind(props.host, props.port) 
    .to(Sink.foreach(_.handleWith(handler))) 
    .run() 
    .onComplete { 
    case Success(i) => logger.info(s"Server is bound at ${props.host}:${props.port}") 
    case Failure(e) => logger.error("Server binding failure", e) 
    } 

Я хочу разрешить максимум одно соединение за раз. Для этого я добавил следующую строку в файл application.conf.

akka.io.tcp.max-channels = 2 

При такой конфигурации akka допускает только одно соединение за раз. Однако, как только вторая попытка подключения он отклоняет запрос и не себя со следующим сообщением:

Could not register incoming connection since selector capacity limit is reached, closing connection 

С этой точкой, не представляется возможным установить любое соединение, так как сервер Tcp вниз.

Вопрос: Каков правильный способ включения только одного соединения за раз? Основная цель - ответить на первый запрос на соединение и отклонить других, пока он все еще продолжается. После того, как предыдущее соединение будет закрыто, снова можно сделать другое соединение. Как я уже упоминал, только одно соединение должно быть разрешено в любое время.

BONUS: Можно ли предоставить белый список, чтобы поток akka принимал соединения только из этого списка? Я планирую разрешить только известные ip-адреса для подключения моего сервера. Чтобы достичь этого, я думаю, что достаточно знать правильный способ отклонения запроса. Поэтому я могу сравнить IP-адрес входящего соединения с данным списком и отклонить, если он там отсутствует. Но любое лучшее решение также приветствуется.

+0

Что вы хотите сделать с тем соединением, которое приходит после обработки? Если ваша цель - просто отказаться от предстоящих подключений, простой способ, я вижу, это ввести какой-то семафор, который указывает количество процессов запросов одновременно, но вместо того, чтобы блокировать, просто отклоните новые соединения. – maks

+0

Что касается BONUS: вы хотите, чтобы этот белый список на уровне сети или приложения? – maks

+0

Не работали с потоками, но как подсказка могла сказать, что после 'bind' вы получаете' Source' 'Tcp.IncomingConnection', которые имеют возможность получить адрес. После этого вы можете каким-то образом отказаться от этого соединения, но это зависит от того, что является отказом в ваших условиях. Это может быть просто другой обработчик, который записывает некоторые данные об ошибках во входящее соединение – maks

ответ

1

Метод привязки Tcp имеет параметр options, который принимает опции «Траверсировка сокета». Вы можете передать л, как это к тому параметра Я:

case class AllowedAddresses(addresses: Seq[InetAddress]) extends SocketOption { 
    override def beforeConnect(s: Socket): Unit = { 
     if (!addresses.contains(s.getInetAddress)) s.close() 
    } 
    } 

так что ваш код будет выглядеть следующим образом:

Tcp() 
    .bind(props.host, props.port, options = List(AllowedAddresses(listOfAddresses))) 
    .to(Sink.foreach(_.handleWith(handler))) 
    .run() 
    .onComplete { 
    case Success(i) => logger.info(s"Server is bound at ${props.host}:${props.port}") 
    case Failure(e) => logger.error("Server binding failure", e) 
    } 

подход ограничения числа запроса такой же, исследовать методы в SocketOptions признака

PS. Не пробовал это, чтобы запустить, только что завершившийся после исследования потока API, поэтому, пожалуйста, проверьте правильность.