В следующем коде я превращаюсь сокет TCP в Observable[Array[Byte]]
:Перезапустите Наблюдаемые подключен к ресурсу
import rx.lang.scala.Observable
import rx.lang.scala.schedulers.IOScheduler
val sock = new Socket
type Bytes = Array[Byte]
lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] {
sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000)
sock
}(
socket => Observable.from[Bytes] {
val incoming = socket.getInputStream
val buffer = new Bytes(1024)
Stream.continually {
val read = incoming.read(buffer, 0, 1024)
buffer.take(read)
}.takeWhile(_.nonEmpty)
},
socket => {
println("Socket disposed")
socket.close
s.retry // Does not work
})
.subscribeOn(IOScheduler.apply)
s.subscribe(bytes => println(new String(bytes, "UTF-8")), println)
Подключение к удаленному серверу может быть прерван в любой момент и в этом случае я, как Observable
чтобы попытаться снова подключиться, но s.retry
ничего не делает. Как я могу это достичь? Также может ли быть сделано «внутри» текущего Observable
без создания нового и повторной подписки?