Я очень новичок в Rx и пытаюсь обвести вокруг него голову. Не читали много, но сначала пытались пройти в лаборатории.Rx.net - синхронный против асинхронных наблюдателей - зависит от источника?
class Program
{
static void Main(string[] args)
{
// one source, produces values with delays
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100));
IObserver<int> handler = null;
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
Это производит асинхронное чередование, как ожидалось.
С другой стороны, если я изменяю источник синхронным, даже наблюдатели становятся блокирующими и синхронными (один и тот же идентификатор потока, не переходит к суб2 без полного потребления sub1). Может кто-нибудь помочь мне понять это? Вот синхронизации версия
class Program
{
static void Main(string[] args)
{
// one source, produces values
IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i);
IObserver<int> handler = null;
// two observers that consume - first with a delay and the second immediately.
// in this case, the behavior of the observers becomes synchronous?
IDisposable subscription = source.Subscribe(
i =>
{
Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i);
Thread.Sleep(500);
},
exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 1 Completed observation"));
IDisposable subscription2 = source.Subscribe(
i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i),
exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception),
() => Console.WriteLine("Sub 2 Completed observation"));
Console.WriteLine("press to cancel");
Console.ReadLine();
subscription.Dispose();
subscription2.Dispose();
}
}
Похоже, если вы перегружаетесь с интервалом времени, выберите планировщик «Scheduler.Default», тогда как те, у кого нет времени, используйте «Scheduler.Immediate» – Raghu