1

У меня есть консольное приложение для чтения всех брокерских сообщений, присутствующих в подписке на Azure Service Bus. У меня там около 3500 сообщений. Это мой код, чтобы читать сообщения:SubscriptionClient.RecieveBatch не получает все брокерские сообщения

SubscriptionClient client = messagingFactory.CreateSubscriptionClient(topic, subscription); 
long count = namespaceManager.GetSubscription(topic, subscription).MessageCountDetails.ActiveMessageCount; 
Console.WriteLine("Total messages to process : {0}", count.ToString()); //Here the number is showing correctly 
IEnumerable<BrokeredMessage> dlIE = null; 
dlIE = client.ReceiveBatch(Convert.ToInt32(count)); 

Когда я выполнить код, в dlIE, я могу видеть только 256 сообщений. Я также попытался предоставить счет предварительной выборки, например, client.PrefetchCount, но затем он также возвращает только 256 сообщений.

Я думаю, что существует некоторое ограничение на количество сообщений, которые могут быть получены за один раз. Однако на странице msdn на методе RecieveBatch нет такой вещи. Что я могу сделать, чтобы получать все сообщения за раз?

Примечание:

  1. Я только хочу, чтобы прочитать сообщение, а затем пусть он существует на сервисной шине. Поэтому я не использую метод message.complete.

  2. Я не могу удалить и воссоздать тему/подписку с Service Bus.

Edit:

Я использовал PeekBatch вместо ReceiveBatch так:

IEnumerable<BrokeredMessage> dlIE = null; 
          List<BrokeredMessage> bmList = new List<BrokeredMessage>(); 
    long i = 0; 
    dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count)); // count is the total number of messages in the subscription. 
    bmList.AddRange(dlIE); 
    i = dlIE.Count(); 
if(i < count) 
    {   
while(i < count) 
    { 
    IEnumerable<BrokeredMessage> dlTemp = null; 
    dlTemp = subsciptionClient.PeekBatch(i, Convert.ToInt32(count)); 
    bmList.AddRange(dlTemp); 
    i = i + dlTemp.Count(); 
    } 
    } 

У меня есть сообщения в подписке. Когда вызывается первый peekBatch, он получает 250 сообщений. поэтому он переходит в цикл while с PeekBatch(250,3225). Каждый раз, когда принимается только 250 сообщений. Окончательные итоговые сообщения, которые я получаю в выходном списке, - с дубликатами. Я не могу понять, как это происходит.

ответ

1

Я понял это. Клиент подписки запоминает последнюю полученную партию, а при повторном вызове получает следующую партию.

Так что код будет:

IEnumerable<BrokeredMessage> dlIE = null; 
List<BrokeredMessage> bmList = new List<BrokeredMessage>(); 
    long i = 0; 
    while (i < count) 
    { 
    dlIE = subsciptionClient.PeekBatch(Convert.ToInt32(count)); 
    bmList.AddRange(dlIE); 
    i = i + dlIE.Count(); 
    } 

Благодаря MikeWo для руководства

Примечание: Там, кажется, какой-то предел размера по количеству сообщений, которые вы можете PEEK вовремя. Я пробовал с различными подписками, и количество полученных сообщений было различным для каждого.

+0

Я упомянул об этом в своем ответе. «Используя тот же SubscriptionClient с PeekBatch под капотом, последний выведенный порядковый номер хранится так же, как и вы прокручиваете его, он должен отслеживать и проходить через всю очередь». – MikeWo

+0

@MikeПростите, что я не читал ваш ответ здесь, но для какого-то другого вопроса в SO. Фактически, прежде чем я увидел ваш ответ здесь, я работал с peekbatch на основе вашего ответа где-то в другом месте. – nitinvertigo

+0

@MikeWo Существует ли ограничение на количество сообщений, полученных PeekBatch? Он отлично работает для 3200, но когда в подписке есть 5000 сообщений, он показывает эту ошибку: Внутренняя ошибка сервера: сервер не дал значимого ответа; это может быть вызвано преждевременным отключением сеанса. – nitinvertigo

2

Является ли тема, которую вы пишете, раздроблена случайно? Когда вы получаете сообщения от секционированного объекта, он будет извлекать только из одного из разделов за раз. From MSDN

«Когда клиент хочет получать сообщение из секционированной очереди или из подписки секционированного раздела, служебная шина запрашивает все фрагменты сообщений, а затем возвращает первое сообщение, которое возвращается из любого из хранилища сообщений в ресивер. Служебная шина кэширует другие сообщения и возвращает их, когда получает дополнительные запросы на получение. Получающий клиент не знает о разметке, поведение клиента в разбивке на разделы или тему (например, чтение, завершение , defer, deadletter, prefetching) идентичен поведению обычного объекта. "

Возможно, не стоит предполагать, что даже при использовании без секционированного объекта вы получите все сообщения в один момент с помощью методов Receive или Peek. Было бы гораздо более эффективно обрабатывать сообщения в гораздо меньших партиях, особенно если ваше сообщение имеет для них приличный размер или неопределенный размер.

Поскольку вы действительно не хотите удалять сообщение из очереди, я бы предложил использовать PeekBatch вместо ReceiveBatch. Это позволяет получить копию сообщения и не блокировать его. Я бы предложил цикл, используя тот же SubscriptionClient, в сочетании с PeekBatch.Используя тот же SubscriptionClient с PeekBatch под капотом, последний выведенный порядковый номер сохраняется так же, как и вы его прокручиваете, он должен отслеживать и проходить через всю очередь. Это позволит вам прочитать всю очередь.

+0

привет спасибо за ответ. Я использовал подход PeekBatch и имею некоторые проблемы с ним. Я обновил вопрос. Не могли бы вы взглянуть на него. – nitinvertigo

0

Я столкнулся с аналогичной проблемой, когда client.ReceiveBatchAsync(....) не получал никаких данных из подписки в шине служебной шины.

После некоторого рытья я узнал, что для каждого абонента есть бит для включения пакетных операций. Это можно активировать только через powershell. Ниже приводится команда, которую я использовал:

$subObject = Get-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName' -SubscriptionName '#subscriptionName' 
$subObject.EnableBatchedOperations = $True 

Set-AzureRmServiceBusSubscription -ResourceGroup '#resourceName' -NamespaceName '#namespaceName' -Topic '#topicName'-SubscriptionObj $subObject 

Более подробную информацию можно найти here. Хотя он все еще не загружал все сообщения, по крайней мере, он начал очищать очередь. Насколько мне известно, параметр размера партии присутствует только в качестве предложения для служебной шины, но не для правила.

Надеюсь, это поможет!

 Смежные вопросы

  • Нет связанных вопросов^_^