1

Я пытаюсь использовать Reactive extensions с Oracle AQ. Когда сообщение появляется в Очереди Oracle, он запускает «OracleAQMessageAvailableEvent», который сообщает потребителю, что есть сообщение. Внутри OracleAQMessageAvailableEventHandler потребитель вызывает OracleAQQueue.Dequeue() для извлечения сообщения.Как создать Hot наблюдаемый в реактивном расширении

У меня есть выше работающий с RX. Ниже приведен код, который я использовал.

var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
        h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h) 
       .Where(x => x.EventArgs.AvailableMessages > 0) 
       .Select(x => 
       { 
        OracleAQMessage msg = _queue.Dequeue(); 
        return (UpdateMsg) msg.Payload; 
       }); 
messages.subscribe(....) 

Проблема заключается в том, что если я подписаться на сообщения, как только все работает, но если я подписаться на сообщения несколько раз (то есть несколько потребителей внутри моего приложения), то каждый потребитель будет пытаться назвать «_queue.Dequeue()» и каждый вызов после первого вызова не будет, если у нас нет нового сообщения.

Может ли кто-нибудь указать мне, что мне делать. Я думаю, мой сценарий для Горячего Наблюдения, но я изо всех сил пытаюсь обнять его.

ответ

3

Я думаю, что вы правы, что ищете Горячую Наблюдаемую. Если мы следуем коду, может быть более понятно, почему вы видите, что _queue.Dequeue(); вызывается несколько раз.

Сначала вы подписаться на событие от Oracle

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h) 

Это так же, как закреплять обработчик событий, как вы бы в мире до Rx. Все, кто прослушивает (подписывается), получат те же самые события. Если они подписываются после того, как событие поднято, то они пропустили его.

Затем вы отфильтровываете пустые наборы.

.Where(x => x.EventArgs.AvailableMessages > 0) 

Ничего особенного.

Затем вы выполняете побочный эффект внутри вашего запроса.

.Select(x => 
    { 
     OracleAQMessage msg = _queue.Dequeue(); 
     return (UpdateMsg) msg.Payload; 
    }); 

Побочный эффект в том, что вы делаете деструктивное чтение (Dequeue). Все подписчики, когда они выталкивают событие из восходящего потока _queue.MessageAvailable, будут пытаться позвонить Dequeue().

Чтобы избежать всех подписчиков, вызывающих побочный эффект, вы можете сделать последовательность Hot (как вы предложили). Для этого вы можете посмотреть оператор Publish().

Оператор Publish() вернет вам IConnectableObservable<T>, который просто расширяет IObservable<T>, добавив метод Connect(). Это позволяет осуществлять мелкозернистый контроль, когда выполняется логика подписки. Однако, это может быть слишком много контроля для вас, и вы, вероятно, найдете, что RefCount() будет именно тем, что вам нужно.

Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
    h => _queue.MessageAvailable += h, 
    h => _queue.MessageAvailable -= h) 
.Where(x => x.EventArgs.AvailableMessages > 0) 
.Select(x => 
    { 
     OracleAQMessage msg = _queue.Dequeue(); 
     return (UpdateMsg) msg.Payload; 
    }) 
.Publish() 
.Refcount(); 

Теперь каждый из ваших абонентов будут получать такое же сообщение, и ваш Dequeue() побочный эффект будет вызван только один раз в случае (и только тогда, когда есть абоненты).

горячей и холодной наблюдаемыми являются покрыты here

+0

Спасибо за ваш комментарий, но он по-прежнему пытается вызвать _queue.Dequeue от нескольких подписчиков. Есть идеи? – tangokhi

+0

Как вы думаете, я должен использовать объект вместо Observable.FromEventPattern. Пользователи могут подписаться на тему, которая отображается как IObservable. Когда сообщение получено из OracleAQ и ушел с обработчиком, я могу удалить сообщение из одного сообщения и вызвать Subject.OnNext (NewMessage), который будет принимать несколько подписчиков? – tangokhi

+0

Хотя я думаю, что теперь все в порядке (с вашим ответом ниже), я думал, что отвечу - Нет, я не думаю, что вы должны использовать тему. Я редко думаю, что кто-то должен использовать тему. Обычно это указывает на дефект дизайна. –

-1

Lee Campbell, К сожалению мой плохой. Решение, о котором вы говорили, работает. На самом деле, я использовал его неправильно. У меня есть класс QueueWrapper, который имеет свойство Message. Я имел эту реализацию сообщения

public IObservable<UpdateMsg> Messages { 
     get { return Observable.FromEventPattern<OracleAQMessageAvailableEventHandler,   OracleAQMessageAvailableEventArgs> (
     h => _queue.MessageAvailable += h, 
     h => _queue.MessageAvailable -= h) 
     .Where(x => x.EventArgs.AvailableMessages > 0) 
     .Select(x => 
     { 
      OracleAQMessage msg = _queue.Dequeue(); 
      return (UpdateMsg) msg.Payload; 
     }) 
     .Publish() 
     .Refcount(); 
}} 

и мой код клиента был подписываться с помощью Messages свойства, как этот

// First Subscription 
_queueWrapper.Messages.Subscribe(....) 

// Second Subscription 
_queueWrapper.Messages.Subscribe(....) 

так для каждой подписки, сообщения недвижимости возвращал новый IObservable. Чтобы это исправить, я переместил инициализацию наблюдаемого конструктора QueueWrapper то есть следующий код:

public QueueWrapper() { 
    _messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
     h => _queue.MessageAvailable += h, 
     h => _queue.MessageAvailable -= h) 
    .Where(x => x.EventArgs.AvailableMessages > 0) 
    .Select(x => 
     { 
      OracleAQMessage msg = _queue.Dequeue(); 
      return (UpdateMsg) msg.Payload; 
     }) 
    .Publish() 
    .Refcount(); 
} 

и моей Messages собственности просто вернуть _messages;

public IObservable<UpdateMsg> Messages { get { return _messages; } } 

После этого все началось, как и следовало ожидать.

 Смежные вопросы

  • Нет связанных вопросов^_^