2016-11-17 3 views
2

Я очень новичок в Rx и пытаюсь обвести вокруг него голову. Не читали много, но сначала пытались пройти в лаборатории.Rx.net - синхронный против асинхронных наблюдателей - зависит от источника?

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values with delays 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100)); 
     IObserver<int> handler = null; 

     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

Это производит асинхронное чередование, как ожидалось.

С другой стороны, если я изменяю источник синхронным, даже наблюдатели становятся блокирующими и синхронными (один и тот же идентификатор потока, не переходит к суб2 без полного потребления sub1). Может кто-нибудь помочь мне понять это? Вот синхронизации версия

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i); 
     IObserver<int> handler = null; 

     // two observers that consume - first with a delay and the second immediately. 
     // in this case, the behavior of the observers becomes synchronous? 
     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

ответ

2

Я считаю, что причина в том, выбранный по умолчанию IScheduler для оператора. Взгляните на принятый ответ here.

Для Generate это зависит от перегрузки. Основываясь на ответе, это используемые планировщики по умолчанию. Вы можете проверить их источник, если вам нравится

  • По умолчанию IScheduler для временного оператора DefaultScheduler.Instance
  • По умолчанию IScheduler для последнего оператора CurrentThreadScheduler.Instance

Вы можете подтвердить это путем предоставления «не- блокировка "в вашей версии синхронизации

IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);

+0

Похоже, если вы перегружаетесь с интервалом времени, выберите планировщик «Scheduler.Default», тогда как те, у кого нет времени, используйте «Scheduler.Immediate» – Raghu