2017-02-20 36 views
0

У меня есть следующий фрагмент кода:параметр Akka Streams + Akka Http Pass через поток

case class SomeClass(param1:String,param2:String,param3:String) 

    val someClassActorSource: Source[SomeClass, ActorRef] = Source 
     .actorPublisher[SomeClass](Props[SomeClassActorPublisher]) 

    val someFlow: ActorRef = Flow[SomeClass] 

     .mapAsync(3)(f=> getDocumentById(f)) 

     .map(f =>{ 
      val request = HttpRequest(method = HttpMethods.POST, uri = "http://localhost:8000/test") 
      .withEntity(ContentTypes.`text/xml(UTF-8)`, ByteString(f.a) 
      ) 
      (request,request) 

     }).via(connection) 

     //Parsing Response 
     .mapAsync(3){ 
      case (Success(HttpResponse(status, _, entity, _)),request)=> 
      entity.dataBytes.runFold(ByteString(""))(_ ++ _) 
     } 
     .map(resp =>parse(resp.utf8String,?????????????)) 
     .to(Sink.someSink{....}) 
     .runWith(someClassActorSource) 

    def parse(resp:String,parseParam:String)=???? 

и где-то в коде я посылающего сообщение для Flow:

someflow ! SomeClass("a","b","c") 
someflow ! SomeClass("a1","b1","c1") 

Моя проблема заключается что метод синтаксический анализ следует использовать param2 из оригинального корпуса класса

Так за первое сообщение должно быть

parse(response,"b") 

и для второго сообщения должно быть

parse(response,"b1") 

Таким образом, вопрос, как я могу извлечь параметр из метода я представил на поток?

ответ

1

Если предположить, что значение connection в настоящее время экземпляр через

val connection = Http().cachedHostConnectionPool(...) 

Вы можете использовать тот факт, что соединение имеет в кортеже, и вместо того, чтобы просто передавая request дважды в кортеже вы можете передать в введенном SomeClass. Этот экземпляр SomeClass должен будет пройти каждый из ваших значений Flow, чтобы перейти на этап синтаксического анализа.

Изменение ваш код немного:

val getDocumentFlow = 
    Flow[SomeClass].mapAsync(3)(f => getSomDocumentById(f).map(d => d -> f)) 

Ваш вопрос не указывается тип возвращаемого getDocumentById, так что я только с помощью Document:

val documentToRequest = 
    Flow[(Document, SomeClass)] map { case (document, someClass) => 
    val request = ... 

    (request, someClass) 
    } 

val parseResponse = 
    Flow[(Try[HttpResponse], SomeClass)].mapAsync(3){ 
    case (Success(HttpResponse(status, _, entity, _)), someClass) => 
     entity 
     .dataBytes 
     .runFold(ByteString(""))(_ ++ _) 
     .map(e => e -> someClass) 
    } 

val parseEntity = Flow[(ByteString, SomeClass)] map { 
    case (entity, someClass) => parse(entity.utf8String, someClass) 
} 

Эти потоки могут быть затем использованы в качестве описанных в вопросе:

val someFlow = 
    someClassActorSource 
    .via(getDocumentFlow) 
    .via(documentToRequest) 
    .via(connection) 
    .via(parseResponse) 
    .via(parseEntity) 
    .to(Sink.someSink{...}) 
    .run()