2015-11-10 5 views
3

Я хочу, чтобы иметь возможность удалять выбранные сообщения из моей очереди.Заполните сообщение в очереди с мертвой буквой на Azure Service Bus

Как это делается?

Я постоянно получаю сообщение об ошибке:

Операция не может быть завершена, поскольку RecieveContext является Null

Я попробовал каждый подход я могу думать и читать о, вот где я сейчас:

public void DeleteMessageFromDeadletterQueue<T>(string queueName, long sequenceNumber) 
     { 
    var client = GetQueueClient(queueName, true); 
       var messages = GetMessages(client); 
       foreach(var m in messages) 
       { 
        if(m.SequenceNumber == sequenceNumber) 
        { 
         m.Complete(); 
        } 
        else 
        { 
         m.Abandon(); 
        } 
       } 
} 

/// <summary> 
     /// Return a list of all messages in a Queue 
     /// </summary> 
     /// <param name="client"></param> 
     /// <returns></returns> 
     private IEnumerable<BrokeredMessage> GetMessages(QueueClient client) 
     { 
      var peekedMessages = client.PeekBatch(0, peekedMessageBatchCount).ToList(); 
      bool getmore = peekedMessages.Count() == peekedMessageBatchCount ? true : false; 

      while (getmore) 
      { 
       var moreMessages = client.PeekBatch(peekedMessages.Last().SequenceNumber, peekedMessageBatchCount); 
       peekedMessages.AddRange(moreMessages); 
       getmore = moreMessages.Count() == peekedMessageBatchCount ? true : false; 
      } 

      return peekedMessages; 
     } 

Не знаете, почему это кажется такой трудной задачей.

ответ

3

Проблема здесь в том, что вы вызвали PeekBatch, который возвращает сообщения, которые просто заглянули. Отсутствует контекст приема, который затем можно использовать для завершения или отказа от сообщения. Операции Peek и PeekBatch возвращают только сообщения и не блокируют их вообще, даже если для принятогоMode установлено значение PeekLock. Это в основном для смены очереди, но вы не можете принять меры на них. Обратите внимание на документы как для Abandon, так и для состояния Complete, «необходимо вызывать только сообщение, которое было получено с помощью приемника, работающего в режиме Peek-Lock ReceiveMode». Здесь неясно, но операции Peek и PeekBatch не учитываются в этом, поскольку они фактически не получают контекст приема. Вот почему он терпит неудачу, когда вы пытаетесь вызвать отказ. Если вы действительно нашли ту, которую искали, это вызовет другую ошибку при вызове Complete.

Что вы хотите сделать, это использовать операцию ReceiveBatch вместо PeekBatch RecieveMode. Это фактически вытащит пакет сообщений, а затем, когда вы просмотрите их, чтобы найти тот, который вы хотите, вы действительно можете повлиять на сообщение. Когда вы запускаете отказ, он немедленно освободит сообщение, которое не является тем, что вы хотите вернуть в очередь.

Если ваша очередь в очереди невелика, обычно это не будет плохо. Если он действительно большой, то такой подход не является наиболее эффективным. Вы обрабатываете очередь мертвых букв больше как кучу и копаете ее, а не обрабатываете сообщения «по порядку». Это не редкость при работе с очередями с мертвой буквой, которые требуют ручного вмешательства, но если у вас их много, тогда может быть лучше иметь что-то, обрабатывающее очередь мертвых букв, в другой тип магазина, где вы можете более легко найти и уничтожать сообщения, но все же могут воссоздавать сообщения, которые можно нажать на другую очередь для обработки.

Могут быть другие варианты, такие как использование Defer, если вы вручную мертвы. См. How to use the MessageReceiver.Receive method by sequenceNumber on ServiceBus.

+0

спасибо за указывая свою ошибку здесь, тьфу, теперь я просто нужен способ, чтобы получить все сообщения из очереди – Slee

+0

я не думаю, вы имеете в виду это так, но вы не хотите получать ВСЕ сообщения из очереди в одно и то же время. Это немного опасно, поскольку неизвестно, сколько на самом деле находится в очереди. Это действительно для обработки очереди мертвых букв? Какие числа вы видите в очереди мертвых букв? – MikeWo

0

Я не увенчался успехом с предложением MikeWo, потому что, когда я использовал комбинацию создания экземпляра DLQ QueueClient с ReceiveMode.PeekLock и вытягивания сообщений с помощью ReceiveBatch, я использовал версии Receive/ReceiveBatch, запрашивающие сообщение по его SequenceNumber.

[в сторону: в мое приложение, я заглядывать все сообщения и перечислить их, и есть еще один обработчик для повторной очереди в основную очередь мертвой литерных сообщение основано на его конкретный номер последовательности ...]

Но вызов Receive (long sequenceNumber) или ReceiveBatch (IEnumerable sequenceNumber) в DLQClient всегда выдает исключение: «Не удалось заблокировать одно или несколько указанных сообщений. Сообщение не существует». (даже когда я только прошел 1, и это определенно в очереди).

Кроме того, по причинам, которые не ясны, использование ReceiveBatch (int messageCount) всегда возвращает только следующее сообщение в очереди независимо от того, какое значение используется как messageCount.

Что, наконец, работал для меня был следующим:

QueueClient queueClient, deadLetterClient; 
GetQueueClients(qname, ReceiveMode.PeekLock, out queueClient, out deadLetterClient); 

BrokeredMessage msg = null; 
var mapSequenceNumberToBrokeredMessage = new Dictionary<long, BrokeredMessage>(); 
while (msg == null) 
{ 
#if UseReceive 
    var message = deadLetterClient.Receive(); 
#elif UseReceiveBatch 
    var messageEnumerable = deadLetterClient.ReceiveBatch(CnCountOfMessagesToPeek).ToList(); 
    if ((messageEnumerable == null) || (messageEnumerable.Count == 0)) 
     break; 
    else if (messageEnumerable.Count != 1) 
     throw new ApplicationException("Invalid expectation that there'd always be only 1 msg returned by ReceiveBatch"); 

    // I've yet to get back more than one in the deadletter queue, but... 
    var message = messageEnumerable.First(); 
#endif 
    if (message.SequenceNumber == lMessageId) 
    { 
     msg = message; 
     break; 
    } 
    else if (mapSequenceNumberToBrokeredMessage.ContainsKey(message.SequenceNumber)) 
    { 
     // this means it's started the list over, so we didn't find it... 
     break; 
    } 
    else 
     mapSequenceNumberToBrokeredMessage.Add(message.SequenceNumber, message); 
    message.Abandon(); 
}       

if (msg == null) 
    throw new ApplicationException("Unable to find a message in the deadletter queue with the SequenceNumber: " + msgid); 

var strMessage = GetMessage(msg); 
var newMsg = new BrokeredMessage(strMessage); 
queueClient.Send(newMsg); 

msg.Complete();