Я хочу отправлять уведомления клиентам через websockets. Эти уведомления генерируются субъектами, поэтому я пытаюсь создать поток сообщений актера при запуске сервера и подписываться на соединения веб-сайтов с этим потоком (отправка только тех уведомлений, которые были отправлены с момента подписки)Как подписаться на веб-сайты для сообщений актера, используя потоки Akka и Akka HTTP?
С Source.actorRef мы можем создать источник актерские сообщения.
val ref = Source.actorRef[Weather](Int.MaxValue, fail)
.filter(!_.raining)
.to(Sink foreach println)
.run()
ref ! Weather("02139", 32.0, true)
Но как я могу подписаться (Акка HTTP *) WebSockets подключения к этому источнику, если был материализовался уже?
* WebSockets соединение в Акко HTTP требует поток [сообщение, сообщения, любое]
Что я пытаюсь сделать что-то вроде
// at server startup
val notifications: Source[Notification,ActorRef] = Source.actorRef[Notificacion](5,OverflowStrategy.fail)
val ref = notifications.to(Sink.foreach(println(_))).run()
val notificationActor = system.actorOf(NotificationActor.props(ref))
// on ws connection
val notificationsWS = path("notificationsWS") {
parameter('name) { name ⇒
get {
onComplete(flow(name)){
case Success(f) => handleWebSocketMessages(f)
case Failure(e) => throw e
}
}
}
}
def flow(name: String) = {
val messages = notifications filter { n => n.name equals name } map { n => TextMessage.Strict(n.data) }
Flow.fromSinkAndSource(Sink.ignore, notifications)
}
Это doensn't работа, так как источник уведомлений это не тот, который материализуется, поэтому он не излучает никакого элемента.
Примечание: Я использую Source.actorPublisher и она работает, но ktoso discourages his usage, а также я получаю эту ошибку:
java.lang.IllegalStateException: onNext is not allowed when the stream has not requested elements, totalDemand was 0.