2017-02-06 12 views
0

Я хочу отправлять уведомления клиентам через websockets. Эти уведомления генерируются субъектами, поэтому я пытаюсь создать поток сообщений актера при запуске сервера и подписываться на соединения веб-сайтов с этим потоком (отправка только тех уведомлений, которые были отправлены с момента подписки)Как подписаться на веб-сайты для сообщений актера, используя потоки Akka и Akka HTTP?

С Source.actorRef мы можем создать источник актерские сообщения.

val ref = Source.actorRef[Weather](Int.MaxValue, fail) 
       .filter(!_.raining) 
       .to(Sink foreach println) 
       .run() 

ref ! Weather("02139", 32.0, true) 

Но как я могу подписаться (Акка HTTP *) WebSockets подключения к этому источнику, если был материализовался уже?

* WebSockets соединение в Акко HTTP требует поток [сообщение, сообщения, любое]

Что я пытаюсь сделать что-то вроде

// at server startup 
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail) 
val ref = notifications.to(Sink.foreach(println(_))).run() 
val notificationActor = system.actorOf(NotificationActor.props(ref)) 

// on ws connection 
val notificationsWS = path("notificationsWS") { 
    parameter('name) { name ⇒ 
    get { 
     onComplete(flow(name)){ 
     case Success(f) => handleWebSocketMessages(f) 
     case Failure(e) => throw e 
     } 
    } 
    } 
} 

def flow(name: String) = { 
    val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) } 
    Flow.fromSinkAndSource(Sink.ignore, notifications) 
} 

Это doensn't работа, так как источник уведомлений это не тот, который материализуется, поэтому он не излучает никакого элемента.

Примечание: Я использую Source.actorPublisher и она работает, но ktoso discourages his usage, а также я получаю эту ошибку:

java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0. 

ответ

1

Вы можете выставить материализованная actorRef к некоторому внешнему маршрутизатору с помощью актера mapMaterializedValue.

Flow.fromSinkAndSourceMat(Sink.ignore, notifications)(Keep.right) 
    .mapMaterializedValue(srcRef => router ! srcRef) 

Маршрутизатор может отслеживать ваши источники actorrefs (Deathwatch может помочь в порядок вещи) и вперед сообщения на них.

NB: вы, вероятно, уже знаете, но обратите внимание, что с помощью Source.actorRef для подачи потока ваш поток не будет воспринимать противодавление (стратегия, которую вы выбрали, будет просто повреждена под нагрузкой).