2017-02-20 22 views
0

У меня возникла проблема с IMessageSessionAsyncHandlerFactory, где новые экземпляры IMessageSessionAsyncHandler не создаются, когда объем записи переходит в 0, а затем до нормального уровня.Azure Service Bus SessionHandler проблема с секционированной очередью

Чтобы быть более точным, я использую SessionHandlerOptions со значением 500 для MaxConcurrentSessions. Это позволяет считывать со скоростью более 1 кс/с. Очередь, которую я читаю, - это секционированная очередь. Объем сообщений в очереди довольно постоянный, но время от времени он становится равным 0. Когда объем возвращается к нормальному уровню, SessionFactory не создает никаких обработчиков, поэтому я больше не могу читать сообщения , Это похоже на то, что сеансы не были правильно переработаны или удерживались в виде непрерывного ожидания.

Вот код для фабричного Регистрация в:

private void RegisterHandler() 
    { 
     var sessionHandlerOptions = new SessionHandlerOptions 
     { 
      AutoRenewTimeout = TimeSpan.FromMinutes(1), 
      MessageWaitTimeout = TimeSpan.FromSeconds(1), 
      MaxConcurrentSessions = 500 
     }; 
     _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions); 
    } 

Завод класс:

public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory 
{ 
    private readonly Action<BrokeredMessage> _callback; 

    public SessionHandlerFactory(Action<BrokeredMessage> callback) 
    { 
     _callback = callback; 
    } 

    public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message) 
    { 
     return new SessionHandler(session.SessionId, _callback); 
    } 

    public void DisposeInstance(IMessageSessionAsyncHandler handler) 
    { 
     var disposable = handler as IDisposable; 

     disposable?.Dispose(); 
    } 
} 

И обработчик:

public class SessionHandler : MessageSessionAsyncHandler 
{ 
    private readonly Action<BrokeredMessage> _callback; 

    public SessionHandler(string sessionId, Action<BrokeredMessage> callback) 
    { 
     SessionId = sessionId; 
     _callback = callback; 
    } 

    public string SessionId { get; } 

    protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message) 
    { 
     try 
     { 
      _callback(message); 
     } 
     catch (Exception ex) 
     { 
      Logger.Error(...); 
     } 
    } 

Я могу видеть, что сессия обработчики закрыты и что фабрики расположены, когда запись/чтение находится на нормальном уровне. Однако, как только очередь опустеет, нет никакого способа создания новых обработчиков сеансов. Существует ли политика для выделения идентификаторов сеансов, которая запрещает перераспределение тех же сеансов после периода бездействия?

Edit 1: Я добавляю две фотографии, чтобы проиллюстрировать поведение: enter image description here

Когда автор останавливается и перезапускается, бегущая читатель не в состоянии читать столько, сколько раньше.

Количество сеансов, созданных после этого момента также значительно ниже, чем раньше: enter image description here

ответ

0

объем сообщений в очереди, а постоянно, но время от времени он получает до 0. Когда том возвращается к нормальному уровню, SessionFactory не порождает никаких обработчиков, поэтому я больше не могу читать сообщения. Это похоже на то, что сеансы не были правильно переработаны или удерживались в виде непрерывного ожидания.

При использовании IMessageSessionHandlerFactory для управления создаются IMessageSessionAsyncHandler экземпляров, вы могли бы попытаться войти создание и уничтожение для всех ваших IMessageSessionAsyncHandler экземпляров.

Основываясь на вашем коде, я создал консольное приложение для этой проблемы на моей стороне.Вот мой фрагмент кода для инициализации клиента очереди и обработки сообщений:

InitializeReceiver

static void InitializeReceiver(string connectionString, string queuePath) 
{ 
    _queueClient = QueueClient.CreateFromConnectionString(connectionString, queuePath, ReceiveMode.PeekLock); 

    var sessionHandlerOptions = new SessionHandlerOptions 
    { 
     AutoRenewTimeout = TimeSpan.FromMinutes(1), 
     MessageWaitTimeout = TimeSpan.FromSeconds(5), 
     MaxConcurrentSessions = 500 
    }; 
    _queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(OnMessageHandler), sessionHandlerOptions); 
} 

OnMessageHandler

static void OnMessageHandler(BrokeredMessage message) 
{ 
    var body = message.GetBody<Stream>(); 

    dynamic recipeStep = JsonConvert.DeserializeObject(new StreamReader(body, true).ReadToEnd()); 
    lock (Console.Out) 
    { 
     Console.ForegroundColor = ConsoleColor.Cyan; 
     Console.WriteLine(
      "Message received: \n\tSessionId = {0}, \n\tMessageId = {1}, \n\tSequenceNumber = {2}," + 
      "\n\tContent: [ title = {3} ]", 
      message.SessionId, 
      message.MessageId, 
      message.SequenceNumber, 
      recipeStep.title); 
     Console.ResetColor(); 
    } 
    Task.Delay(TimeSpan.FromSeconds(3)).Wait(); 
    message.Complete(); 
} 

За моим испытанием, SessionHandler может работать, как и ожидалось, когда объем сообщений в очереди от нормального до нуля и от нуля до нормального в течение некоторого времени следующим образом:

Я также попытался использовать QueueClient.RegisterSessionHandlerAsync, чтобы проверить эту проблему, и она работает, как хорошо. Кроме того, я нашел этот образец git около Service Bus Sessions, вы можете сослаться на него.

+0

Спасибо, Брюс, я добавил два изображения к вопросу, чтобы проиллюстрировать поведение. –