2015-07-09 3 views
2

Я просто пытаюсь понять концепции между горячим и холодным наблюдаемым и опробовать библиотеку Monifu. Я понимаю, что следующий код должен привести только к тому, что один из абонентов получит события, выпущенные Observable, но это не так!Scala Rx Наблюдаемый с помощью Monifu

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

import monifu.reactive._ 
import scala.concurrent.duration._ 

import monifu.concurrent.Implicits.globalScheduler 

val obs = Observable.interval(1.second).take(10) 

val x = obs.foreach(a => println(s"from x ${a}")) 
val y = obs.foreach(a => println(s"from y ${a}")) 

// Exiting paste mode, now interpreting. 

from x 0 
from y 0 
import monifu.reactive._ 
import scala.concurrent.duration._ 
import monifu.concurrent.Implicits.globalScheduler 
obs: monifu.reactive.Observable[Long] = [email protected] 
x: Unit =() 
y: Unit =() 

scala> from x 1 
from y 1 
from x 2 
from y 2 
from x 3 
from y 3 
from x 4 
from y 4 
from x 5 
from y 5 
from x 6 
from y 6 
from x 7 
from y 7 
from x 8 
from y 8 
from x 9 
from y 9 

Итак, это для меня похоже, что Observable публикует события всем заинтересованным абонентам?

ответ

2

Я являюсь основным автором Monifu.

A холодных наблюдаемые означает, что его подписывается функция инициирует новый источник данных для каждого абонента (на каждый subscribe() вызова), в то время как горячей наблюдаемого в том же обмене данных между источником несколько абонентами.

В качестве примера рассмотрим файл, который будет источником данных. Позволяет моделировать простое наблюдение, которое испускает линии из файла:

def fromFile(file: File): Observable[String] = { 
    // this is the subscribe function that 
    // we are passing to create ;-) 
    Observable.create { subscriber => 
    // executing things on our thread-pool 
    subscriber.scheduler.execute { 
     val source = try { 
     Observable.fromIterable(scala.io.Source 
      .fromFile(file).getLines().toIterable) 
     } 
     catch { 
     // subscribe functions must be protected 
     case NonFatal(ex) => 
      Observable.error(ex) 
     } 

     source.unsafeSubscribe(subscriber) 
    } 
    } 
} 

Эта функция создает наблюдаемый холод. Это означает, что он откроет новый дескриптор файла для каждого подписанного наблюдателя, а затем прочитает и испустит строки для каждого подписанного наблюдателя.

Но мы можем превратить его в горячий наблюдаемым:

// NOTE: publish() turns a cold observable into a hot one 
val hotObservable = fromFile(file).publish() 

И тогда разница будет, когда вы делаете это:

val x = observable.subscribe() 
val y = observable.subscribe() 

Если наблюдаемая горяч:

  1. наблюдаемый ничего не делает, пока не позвоните connect() на нем
  2. после connect(), тот же файл открывается, и оба получат точно такие же события
  3. после того, как все строки из этого файла будут выпущены, а новые подписчики будут ничего не получить, потому что (общий) источник данных уже был обедненного

Если наблюдаемая холодная:

  1. на каждой подписки, новый дескриптор файла открыт и читать
  2. элементы выбрасываются сразу после subscribe(), поэтому нет необходимости ждать, подписываться connect()
  3. все наблюдатели будут получать все строки из этого файла, Вне зависимости от того момента, как сделать так

Некоторые ссылки, которые также относятся к Monifu:

  1. Connectable Observable from RxJava's wiki
  2. Intro to Rx: Hot and Cold Observables
  3. Subjects from RxJava's wiki
+0

Спасибо Nedelcu! Это помогло мне понять основы! – sparkr