Я пытаюсь подключить абонента реактивированного потока к источнику akka.Невозможно использовать recivestream Абонент с источниками потока akka
Мой источник, кажется, работает нормально с простой раковиной (например, foreach) - но если я положил реальную раковину, сделанную от подписчика, я ничего не получаю.
Мой контекст:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import org.reactivestreams.{Subscriber, Subscription}
implicit val system = ActorSystem.create("test")
implicit val materializer = ActorMaterializer.create(system)
class PrintSubscriber extends Subscriber[String] {
override def onError(t: Throwable): Unit = {}
override def onSubscribe(s: Subscription): Unit = {}
override def onComplete(): Unit = {}
override def onNext(t: String): Unit = {
println(t)
}
}
и мой тестовый пример:
val subscriber = new PrintSubscriber()
val sink = Sink.fromSubscriber(subscriber)
val source2 = Source.fromIterator(() => Iterator("aaa", "bbb", "ccc"))
val source1 = Source.fromIterator(() => Iterator("xxx", "yyy", "zzz"))
source1.to(sink).run()(materializer)
source2.runForeach(println)
Я получаю выход:
aaa
bbb
ccc
Почему я не могу получить ххх, ууу, и ZZZ?