2017-02-08 11 views
3

Я пытаюсь отправить сообщение актеру bindet с Source.actorRef, но эта часть кода:mapMaterializedValue ничего не делать с Source.actorRef

println(s"Before mapping $src") 
src.mapMaterializedValue { ref => 
    println(s"Mapping $ref") 
    ref ! letter.text 
} 
println(s"After mapping $src") 

печатает только что-то вроде этого:

Перед отображением Источник (SourceShape (ActorRefSource.out), ActorRefSource (0, Нормально) [5564f412])
После отображения Источник (SourceShape (ActorRefSource.out), ActorRefSource (0, Нормально) [5564f412])

So. Каким-то образом картаMaterializedValue не делать ничего. Конечно, никакое сообщение для актера не отправлено. Is ref - Отсутствует почему-то?

Кроме того, я размещаю весь код. Это участок чего-то вроде простого мессенджера (одно-одно сообщение) на веб-сайтах. Сейчас я просто изучаю потоки Akka, поэтому этот код действительно не идеален. Я готов услышать любые критики или советы.

Главный объект сервер:

package treplol.server 

import treplol.common._ 

import akka.actor.{ActorRef, ActorSystem} 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.ws._ 
import akka.http.scaladsl.server.Directives._ 
import akka.stream.scaladsl._ 
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy} 

import scala.io.StdIn 
import java.util.UUID 

object WsServer extends App { 

    implicit val system = ActorSystem("example") 
    implicit val materializer = ActorMaterializer() 

    def createSource(uuid: UUID): Source[String, ActorRef] = { 
    val src = Source.actorRef[String](0, OverflowStrategy.fail) 
    sources(uuid) = src 
    src 
    } 

    val sources: collection.mutable.HashMap[UUID, Source[String, ActorRef]] = 
    collection.mutable.HashMap[UUID, Source[String, ActorRef]]() 
    val userSources: collection.mutable.HashMap[String, UUID] = 
    collection.mutable.HashMap[String, UUID]() 

    def flow: Flow[Message, Message, Any] = { 

    val uuid: UUID = UUID.randomUUID() 
    val incomingSource: Source[String, ActorRef] = createSource(uuid) 

    Flow.fromGraph(GraphDSL.create() { implicit b => 
     import GraphDSL.Implicits._ 

     val merge = b.add(Merge[String](2)) 

     val mapMsgToLttr = b.add(
     Flow[Message].collect { case TextMessage.Strict(txt) => txt } 
      .map[Letter] { txt => 
      WsSerializer.decode(txt) match { 
       case Auth(from) => 
       userSources(from) = uuid 
       Letter("0", from, "Authorized!") 
       case ltr: Letter => ltr 
      } 
      } 
    ) 

     val processLttr = b.add(
     Flow[Letter].map[String] { letter => 
      userSources.get(letter.to) flatMap sources.get match { 
      case Some(src) => 
       println(s"Before mapping $src") 
       src.mapMaterializedValue { ref => 
       println(s"Mapping $ref") 
       ref ! letter.text 
       } 
       println(s"After mapping $src") 
       "" 
      case None => "Not authorized!" 
      } 
     } 
    ) 

     val mapStrToMsg = b.add(
     Flow[String].map[TextMessage] (str => TextMessage.Strict(str)) 
    ) 

     mapMsgToLttr ~> processLttr ~> merge 
        incomingSource ~> merge ~> mapStrToMsg 

     FlowShape(mapMsgToLttr.in, mapStrToMsg.out) 
    }) 

    } 

    val route = path("ws")(handleWebSocketMessages(flow)) 
    val bindingFuture = Http().bindAndHandle(route, "localhost", 8080) 

    println(s"Server online at http://localhost:8080/\nPress RETURN to stop...") 
    StdIn.readLine() 

    import system.dispatcher 
    bindingFuture 
    .flatMap(_.unbind()) 
    .onComplete(_ => system.terminate()) 
} 

Общего пакет:

package treplol 

package object common { 

    trait WsMessage 
    case class Letter(from: String, to: String, text: String) extends WsMessage 
    case class Auth(from: String) extends WsMessage 

    object WsSerializer { 

    import org.json4s.{Extraction, _} 
    import org.json4s.jackson.JsonMethods.{compact, parse} 
    import org.json4s.jackson.Serialization 

    implicit val formats = { 
     Serialization.formats(NoTypeHints) 
    } 

    case class WsData(typeOf: String, data: String) 
    object WsDataType { 
     val LETTER = "letter" 
     val AUTH = "auth" 
    } 

    class WrongIncomingData extends Throwable 

    def decode(wsJson: String): WsMessage = parse(wsJson).extract[WsData] match { 
     case WsData(WsDataType.LETTER, data) => parse(data).extract[Letter] 
     case WsData(WsDataType.AUTH, data) => parse(data).extract[Auth] 
     case _ => throw new WrongIncomingData 
    } 

    def encode(wsMessage: WsMessage): String = { 
     val typeOf = wsMessage match { 
     case _: Letter => WsDataType.LETTER 
     case _: Auth => WsDataType.AUTH 
     case _ => throw new WrongIncomingData 
     } 
     compact(Extraction.decompose(
     WsData(typeOf, compact(Extraction.decompose(wsMessage))) 
    )) 
    } 
    } 

} 

build.sbt

name := "treplol" 

version := "0.0" 

scalaVersion := "2.12.1" 

resolvers += "Typesafe Releases" at "http://repo.typesafe.com/typesafe/releases" 

libraryDependencies ++= Seq(
    "com.typesafe.akka" %% "akka-actor" % "2.4.16", 
    "com.typesafe.akka" %% "akka-stream" % "2.4.16", 
    "com.typesafe.akka" %% "akka-http" % "10.0.3", 
    "org.json4s" %% "json4s-jackson" % "3.5.0" 
) 

Спасибо всего заранее!

+0

Похож, что 'mapMaterializedValue' возвращает новый источник. Ваш пример похож на выражение «val x = 1; Println (х); x + 3; Println (х) '. Попробуйте 'val src2 = src.mapMaterializedValue (...); println (src2) ' – Dylan

ответ

4

В соответствии с Документами, то mapMaterializedValue комбинатор

Transform только материализованный значение этого источника, в результате чего все другие свойства, как они были.

Материализованное значение доступно только после любой стадии графика (в данном случае, источник) является запустить. Вы никогда не используете свой источник в своем коде.

Обратите внимание, что Flow[Message, Message, Any], используемый для обработки сообщений WebSocket, фактически управляется инфраструктурой Akka-HTTP, поэтому вам не нужно делать это вручную. Тем не менее, Source, которые вы создаете в теле processLttr, не привязаны к остальной части графика и поэтому не запускаются.

Для получения дополнительной информации о выполняемых графиках и материализации обратитесь к docs.

+0

Спасибо, что ответили. Стефано! Но посмотрите ... Для каждого соединения в начале метода 'flow' я создаю' incomingSource' и помещаю его в hashmap 'sources' (в' createSource'). В конце 'flow'' incomingSource' добавляется в график. В 'processLttr' я беру уже материализованный источник из hashmap. По крайней мере, я ожидаю, что я это делаю. So. Где я ошибаюсь? – Sasha

+1

источник, который вы берете из хэшмапа, не «уже материализовался». Каждый источник является неизменным и может свободно использоваться. Когда вы запускаете источник, вы возвращаете его материализованное значение (его оригинальное или его отображаемое материализованное значение). И вы можете запускать источник столько раз, сколько хотите. –

0

Благодаря Stefano!

Но, похоже, нет способа добиться того, что я хотел с таким путем. Но я углубился и использовал custom stream processing and integration with actors. С помощью этой техники я могу передавать сообщения в определенный поток извне. (Эта функция еще экспериментальна!)