2015-12-16 3 views
3

Акка-клиент documentation говорит:Акка-клиент процесс HttpRequests из различных соединений в одном потоке

Помимо относительно сокета, связанный на серверной стороне как Источника [IncomingConnection] и каждое соединение в качестве Источник [HttpRequest] с раковиной [HttpResponse]

Предположим, мы получаем объединенный источник, содержащий входящие соединения из многих источников [IncomingConnection].

Затем предположим, что мы получаем Source [HttpRequest] из Source [IncomingConnection] (см. Код ниже).

Тогда нет проблем, мы можем предоставить поток для преобразования HttpRequest в HttpResponse.

И вот в чем проблема - как мы можем правильно разорвать ответы? Как мы можем присоединиться к ответам на соединения?

Вся идея использования прецедента - это возможность приоритизировать обработку входящих запросов из разных соединений. Должен быть полезен во многих случаях, я думаю ...

Заранее благодарим!

Edit: Решение на основе ответа от @RamonJRomeroyVigil:

Серверный код:

val in1 = Http().bind(interface = "localhost", port = 8200) 
val in2 = Http().bind(interface = "localhost", port = 8201) 

val connSrc = Source.fromGraph(FlowGraph.create() { implicit b => 
    import FlowGraph.Implicits._ 

    val merge = b.add(Merge[IncomingConnection](2)) 

    in1 ~> print("in1") ~> merge.in(0) 
    in2 ~> print("in2") ~> merge.in(1) 

    SourceShape(merge.out) 
}) 

val reqSrc : Source[(HttpRequest, IncomingConnection), _] = 
    connSrc.flatMapConcat { conn => 
    Source.empty[HttpResponse] 
     .via(conn.flow) 
     .map(request => (request, conn)) 
    } 

val flow: Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] = 
    Flow[(HttpRequest, IncomingConnection)].map{ 
     case (HttpRequest(HttpMethods.GET, Uri.Path("/ping"), _, entity, _), conn: IncomingConnection) => 
     println(s"${System.currentTimeMillis()}: " + 
      s"process request from ${conn.remoteAddress.getHostName}:${conn.remoteAddress.getPort}") 
     (HttpResponse(entity = "pong"), conn) 
    } 

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) => 
    Source.single(resp).via(conn.flow).runWith(Sink.ignore) 
}).run() 

def print(prefix: String) = Flow[IncomingConnection].map { s => 
    println(s"$prefix [ ${System.currentTimeMillis()} ]: ${s.remoteAddress}"); s 
} 

Итак, я использую локон из консоли и видим следующее:

% curl http://localhost:8200/ping 
curl: (52) Empty reply from server 

Запрос второго витка не работает:

% curl http://localhost:8200/ping 
curl: (7) Failed to connect to localhost port 8200: Connection refused 

На консоли сервера я вижу следующее при отправке 1-ый запрос:

in1 [ 1450287301512 ]: /127.0.0.1:52461 
1450287301626: process request from localhost:52461 
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/0#119537130] to Actor[akka://default/system/IO-TCP-STREAM/server-1-localhost%2F127.0.0.1%3A8200#-1438663077] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[INFO] [12/16/2015 20:35:01.641] [default-akka.actor.default-dispatcher-6] [akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201] Message [akka.io.Tcp$Unbound$] from Actor[akka://default/system/IO-TCP/selectors/$a/1#679898594] to Actor[akka://default/system/IO-TCP-STREAM/server-2-localhost%2F127.0.0.1%3A8201#1414174163] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 

И ничего при отправке 2-й запрос.

Итак, похоже, что есть некоторые проблемы с внутренним потоком связи (как указано @RamonJRomeroyVigil) или с чем-то еще ...

В основном код не работает.

Все еще исследуя проблему.

+1

Если вы преобразуете свой 'Source [IncomingConnection]' в 'Source [HttpRequest]', как бы вы отправили свой «HttpResponse» обратно клиенту? Как только вы сгенерировали ответ в потоковом конвейере, у вас больше нет дескриптора для IncomingConnection ... –

+1

@RamonJRomeroyVigil это вопрос ... Глядя на API, я не могу найти ответ, но логически логически - это не должно быть проблемой. Например: 1. мы имеем два связанных соединений: A и B 2. оба соединения объединены в С 3. С производит HTTPRequest экземпляры 4. некоторый поток обработки преобразует HttpRequest к HttpResponse 5. finallly каким-то образом, мы должны маршрут HttpResponse к правильному соединению - выглядит довольно логично и приемлемо. Итак, возможно, можно использовать некоторую оболочку HttpRequest, которая может включать экземпляр IncomingConnection, чтобы мы могли использовать его позже? –

+1

Я написал ответ с предостережением. Я рекомендую вам выделять потоки для разных приоритетов. Может быть, открыть два разных порта для двух разных уровней приоритета, а затем использовать пулы потоков различного размера для каждого из портов. –

ответ

0

Данное решение основано на дополнительной информации, предоставленной в комментариях к заданию.

Учитывая

val connSrc : Source[IncomingConnection,_] = ??? 

Метод flatMapConcat решает конкретный вопрос, сформулированный:

val reqSrc : Source[(HttpRequest, IncomingConnection), _] = 
    connSrc.flatMapConcat { conn => 
    Source.empty[HttpResponse] 
      .via(conn.flow) 
      .map(request => (request, conn)) 
    } 

Это обеспечивает источник (HttpRequest, IncomingConnection) кортежей.

Если у вас есть этап обработки, который преобразует запросы на акустический отклик

val flow : Flow[(HttpRequest, IncomingConnection), (HttpResponse, IncomingConnection), _] = ??? 

Вы можете отправить ответ обратно клиенту:

reqSrc.via(flow).to(Sink.foreach { case (resp, conn) => 
    Source.single(resp).via(conn.flow).runWith(Sink.ignore) 
}) 

Предупреждение: Это решение вызывает conn.flow дважды: один раз для создания потока, который генерирует запросы, и снова для создания потока для отправки ответов. Я не знаю, будет ли этот тип использования прервать что-то в логике IncomingConnection.