2012-02-03 1 views
0

У меня вопрос 2. Ниже приведен сценарий -Как удалить сообщения из очереди, когда приложение было отключено (Oracle Advanced Queue)

Существует 2 разных процесса процесса A и процесс B. Процесс. Сообщение очереди в очереди сообщений. Процесс B dequeue - это сообщение из очереди сообщений.

1) Процесс B выключается в течение некоторого времени, но процесс A продолжает выдавать сообщение в очереди. Когда процесс B возвращается вживую, как удалить сообщения из очереди сообщений, которые были отправлены процессом A, когда Process B был отключен?

2) Очередь, которую я использую, представляет собой многопользовательскую очередь, поскольку для удаления сообщения требуется более одного процесса B. Причина дизайна заключается в том, что если один из процессов B умирает, другой процесс B все равно может продолжать обрабатывать сообщение. В то же время, если 1 экземпляр процесса B получил сообщение, он должен уведомить другой процесс B, чтобы не обрабатывать сообщение.

Я не нашел никаких образцов. Любая помощь приветствуется.

ответ

0

Я только что завершил проект с довольно схожими требованиями.

Проблема 1) Я создал таймер обслуживания Windows, который вызывает службу восстановления WCF для периодического запуска. Затем служба WCF удалит все, что попадает в очередь (до 500 сообщений для каждого вызова). Anything Enquued должен автоматически обрабатываться так, чтобы даже если этот таймер остановился после его перезапуска, он заберет там, где он остановился.

Проблема 2) Я копировал данные из Oracle в CouchBase, поэтому у меня была отметка времени для извлечения при запуске процесса и отметка времени для уже сохраненных данных в CouchBase, если первая была старше последней, тогда она не будет спасти. (Это должно было заботиться о условиях гонки).

В Oracle у меня также был триггер, который когда-то был установлен в очередь, он скопировал идентификатор и время, указанное в секундах, на вторую таблицу. Периодически эта вторая таблица проверяется и если элемент был удален из очереди в таблице очередей, но вторая таблица не была обновлена, чтобы отразить это в течение определенного периода времени службой WCF, она повторно занесет данные в данные, поскольку что-то не удалось в процессе ,

В случае, если это полезно, вот пример службы восстановления wcf, использующей odp.net.

OracleAQQueue _queueObj; 
OracleConnection _connObj; 
_connString = ConfigurationManager.ConnectionStrings["connectionstring"].ToString(); 
_connObj = new OracleConnection(_connString); 
_queueObj = new OracleAQQueue("QUEUENAME", _connObj); 
_connObj.Open(); 

    int i = 0; 
    bool messageAvailable = true; 

    while (messageAvailable && i < 500) 
    { 
    OracleTransaction _txn = _connObj.BeginTransaction(); 
    //Makes dequeue part of transaction 
    _queueObj.DequeueOptions.Visibility = OracleAQVisibilityMode.OnCommit; 
    _queueObj.DequeueOptions.ConsumerName = "CONSUMERNAME" 
    try 
    { 
     //Wait number of seconds for dequeue, default is forever 
     _queueObj.DequeueOptions.Wait = 2; 
     _queueObj.MessageType = OracleAQMessageType.Raw; 
     _queueObj.DequeueOptions.ProviderSpecificType = true; 
     OracleAQMessage _depMsq = _queueObj.Dequeue(); 
     var _binary = (OracleBinary)_depMsq.Payload; 
     byte[] byteArray = _binary.Value; 
     _txn.Commit(); 
    } 
    catch (Exception ex) 
    { 
     //This catch will always fire when all messages have been dequeued 
     messageAvailable = false; 
     if (ex.Message.IndexOf("end-of-fetch during message dequeue") == -1) 
      { 
      //Actual error present. 
      log.Info("Problem occurred during dequeue process : " + ex.Message); 
      } 
    } 
    } 

    _queueObj.Dispose(); 
    _connObj.Close(); 
    _connObj.Dispose(); 
    _connObj = null;