Я пытаюсь использовать Reactive extensions с Oracle AQ. Когда сообщение появляется в Очереди Oracle, он запускает «OracleAQMessageAvailableEvent», который сообщает потребителю, что есть сообщение. Внутри OracleAQMessageAvailableEventHandler потребитель вызывает OracleAQQueue.Dequeue() для извлечения сообщения.Как создать Hot наблюдаемый в реактивном расширении
У меня есть выше работающий с RX. Ниже приведен код, который я использовал.
var messages = Observable.FromEventPattern<OracleAQMessageAvailableEventHandler, OracleAQMessageAvailableEventArgs> (
h => _queue.MessageAvailable += h, h => _queue.MessageAvailable -= h)
.Where(x => x.EventArgs.AvailableMessages > 0)
.Select(x =>
{
OracleAQMessage msg = _queue.Dequeue();
return (UpdateMsg) msg.Payload;
});
messages.subscribe(....)
Проблема заключается в том, что если я подписаться на сообщения, как только все работает, но если я подписаться на сообщения несколько раз (то есть несколько потребителей внутри моего приложения), то каждый потребитель будет пытаться назвать «_queue.Dequeue()» и каждый вызов после первого вызова не будет, если у нас нет нового сообщения.
Может ли кто-нибудь указать мне, что мне делать. Я думаю, мой сценарий для Горячего Наблюдения, но я изо всех сил пытаюсь обнять его.
Спасибо за ваш комментарий, но он по-прежнему пытается вызвать _queue.Dequeue от нескольких подписчиков. Есть идеи? – tangokhi
Как вы думаете, я должен использовать объект вместо Observable.FromEventPattern. Пользователи могут подписаться на тему, которая отображается как IObservable. Когда сообщение получено из OracleAQ и ушел с обработчиком, я могу удалить сообщение из одного сообщения и вызвать Subject.OnNext (NewMessage), который будет принимать несколько подписчиков? – tangokhi
Хотя я думаю, что теперь все в порядке (с вашим ответом ниже), я думал, что отвечу - Нет, я не думаю, что вы должны использовать тему. Я редко думаю, что кто-то должен использовать тему. Обычно это указывает на дефект дизайна. –