2017-01-08 4 views
0

В следующем коде я превращаюсь сокет TCP в Observable[Array[Byte]]:Перезапустите Наблюдаемые подключен к ресурсу

import rx.lang.scala.Observable 
import rx.lang.scala.schedulers.IOScheduler 

val sock = new Socket 
type Bytes = Array[Byte] 

lazy val s: Observable[Bytes] = Obs.using[Bytes, Socket] { 
    sock.connect(new InetSocketAddress("10.0.2.2", 9002), 1000) 
    sock 
}(
    socket => Observable.from[Bytes] { 

    val incoming = socket.getInputStream 
    val buffer = new Bytes(1024) 

    Stream.continually { 
     val read = incoming.read(buffer, 0, 1024) 
     buffer.take(read) 
    }.takeWhile(_.nonEmpty) 

    }, 

    socket => { 
    println("Socket disposed") 
    socket.close 
    s.retry // Does not work 
    }) 
    .subscribeOn(IOScheduler.apply) 

s.subscribe(bytes => println(new String(bytes, "UTF-8")), println) 

Подключение к удаленному серверу может быть прерван в любой момент и в этом случае я, как Observable чтобы попытаться снова подключиться, но s.retry ничего не делает. Как я могу это достичь? Также может ли быть сделано «внутри» текущего Observable без создания нового и повторной подписки?

ответ

1

Вы хотите настроить новое соединение сокета для каждой новой подписки. Это проще всего с (A)SyncOnSubscribe, портировано на RxScala с версии 0.26.5. У вас есть это наблюдение, вы можете использовать обычные методы контроля ошибок, такие как .retry.

Что-то вроде этого:

val socketObservable: Observable[Byte] = Observable.create(SyncOnSubscribe.singleState(
    generator =() => 
    sock 
     .connect(new InetSocketAddress("10.0.2.2", 9002), 1000) 
     .getInputStream 
)(next = is => Try(is.read()) match { 
    case Success(-1) => Notification.OnCompleted() 
    case Success(byte) => Notification.OnNext(byte) 
    case Failure(e) => Notification.OnError(e) 
    }, 
    onUnsubscribe = is => Try(is.close) 
) 

Примечание: это читает один байт в то время и не очень эффективным. Вы можете улучшить это с помощью ASyncOnSubscribe или иметь каждое событие вашего наблюдаемого - массив байтов.

Примечание: это холодное наблюдение и создаст новый разъем для каждого абонента. Например, это откроет 2 гнезда:

socketObservable.foreach(b => System.out.print(b)) 
socketObservable.buffer(1024).foreach(kiloByte => System.out.println(kiloByte)) 

Если это не то, что вы хотите, вы можете превратить его в горячую с .share