2015-09-14 3 views
3

У меня есть класс шины сообщений, который использует Rx для ввода нескольких потоков событий в приложении WPF. Моя проблема ObserveOnDispatcher не вызывает обработчик событий в потоке пользовательского интерфейса.ObserveOnDispatcher доза не вызывает обработчик в потоке пользовательского интерфейса

Код:

private void button_Click(object sender, RoutedEventArgs e) 
{ 
    var messageBus = new MessageBus(); 
    messageBus.GetMessages<Message>().ObserveOnDispatcher().Subscribe(x => TestHanlder(x)); 

    Trace.WriteLine("Main Thread Id:" + Thread.CurrentThread.ManagedThreadId); 
    var deviceManager = new DeviceManager(messageBus); 
    deviceManager.Start(); 
} 

private void TestHanlder(Message message) 
{ 
    Trace.WriteLine("UI Handler ThreadId:" + Thread.CurrentThread.ManagedThreadId); 
} 

public class DeviceManager 
{ 
    private readonly MessageBus _messageBus; 

    public DeviceManager(MessageBus messageBus) 
    { 
    _messageBus = messageBus; 
    } 

    public void Start() 
    { 
    for (;;) 
    { 
     var t = Task.Factory.StartNew(() => BackGroundTask(), TaskCreationOptions.LongRunning); 
     t.Wait(); 
    } 
    } 

    private void BackGroundTask() 
    { 
    Thread.Sleep(1000); 
    Trace.WriteLine("Push ThreadId:" + Thread.CurrentThread.ManagedThreadId); 
    var message = new Message(); 
    _messageBus.Publish(message); 
    } 
} 

public class MessageBus 
{ 
    readonly ISubject<object> _messages; 

    public MessageBus() 
    { 
    _messages = new Subject<object>(); 
    } 
    public void Publish<TMessage>(TMessage message) 
    { 
    _messages.OnNext(message); 
    } 

    public IObservable<TMessage> GetMessages<TMessage>() 
    { 
    return _messages.OfType<TMessage>(); 
    } 
} 

public class Message 
{ 
    public Message() 
    { 

    } 
} 

Without ObserveOnDispatcher: 
messageBus.GetMessages<Message>().Subscribe(x => TestHanlder(x)); 
..........................Output................................. 
Main Thread Id:8 
Push ThreadId:9 
UI Handler ThreadId:9 
But I need to execute the TestHanlder function in the main thread or UI thread, in my use case above it must be the thread number 8. 

When I use ObserveOnDispatcher: 
messageBus.GetMessages<Message>().ObserveOnDispatcher().Subscribe(x => TestHanlder(x)); 
..........................Output................................. 

Main Thread Id:9 
Push ThreadId:10 
------------------> UI Handler ThreadId: are missing not there!? 
What I'm doing wrong here?!!!!! 
+0

Что случилось с моим кодом? Любая помощь приветствуется ... –

+1

Существует много классов «MessageBus». Если это приведет к сообщениям в рабочем потоке, то маловероятно, то ObserveOnDispatcher() будет использовать неправильный диспетчер. –

+0

@HansPassant хорошая точка Я думаю, что ObserveOnDispatcher использует неверный диспетчер, но как я могу это доказать? следует ли использовать другую реализацию для класса MessageBus? –

ответ

3

for (;;) { t.Wait(); } этот код выполняется в потоке пользовательского интерфейса и предотвращает его от выполнения ничего, переданное на него. ObserveOnDispatcher работает нормально, но ваш поток диспетчера заблокирован.

Если ввести асинхронный/Await (который выпустит нить), сценарий будет работать нормально:

private async void button_Click(object sender, RoutedEventArgs e) 
    { 
     var messageBus = new MessageBus(); 
     messageBus.GetMessages<Message>().ObserveOnDispatcher().Subscribe(x => TestHanlder(x)); 

     Trace.WriteLine("Main Thread Id:" + Thread.CurrentThread.ManagedThreadId); 
     var deviceManager = new DeviceManager(messageBus); 
     await deviceManager.Start(); 
    } 

    ... 

     public async Task<Unit> Start() 
     { 
      for (;;) 
      { 
       await Task.Factory.StartNew(() => BackGroundTask(), TaskCreationOptions.LongRunning); 
      } 
     }