У меня возникла проблема с IMessageSessionAsyncHandlerFactory
, где новые экземпляры IMessageSessionAsyncHandler
не создаются, когда объем записи переходит в 0, а затем до нормального уровня.Azure Service Bus SessionHandler проблема с секционированной очередью
Чтобы быть более точным, я использую SessionHandlerOptions
со значением 500 для MaxConcurrentSessions
. Это позволяет считывать со скоростью более 1 кс/с. Очередь, которую я читаю, - это секционированная очередь. Объем сообщений в очереди довольно постоянный, но время от времени он становится равным 0. Когда объем возвращается к нормальному уровню, SessionFactory не создает никаких обработчиков, поэтому я больше не могу читать сообщения , Это похоже на то, что сеансы не были правильно переработаны или удерживались в виде непрерывного ожидания.
Вот код для фабричного Регистрация в:
private void RegisterHandler()
{
var sessionHandlerOptions = new SessionHandlerOptions
{
AutoRenewTimeout = TimeSpan.FromMinutes(1),
MessageWaitTimeout = TimeSpan.FromSeconds(1),
MaxConcurrentSessions = 500
};
_queueClient.RegisterSessionHandlerFactoryAsync(new SessionHandlerFactory(_callback), sessionHandlerOptions);
}
Завод класс:
public class SessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
private readonly Action<BrokeredMessage> _callback;
public SessionHandlerFactory(Action<BrokeredMessage> callback)
{
_callback = callback;
}
public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
{
return new SessionHandler(session.SessionId, _callback);
}
public void DisposeInstance(IMessageSessionAsyncHandler handler)
{
var disposable = handler as IDisposable;
disposable?.Dispose();
}
}
И обработчик:
public class SessionHandler : MessageSessionAsyncHandler
{
private readonly Action<BrokeredMessage> _callback;
public SessionHandler(string sessionId, Action<BrokeredMessage> callback)
{
SessionId = sessionId;
_callback = callback;
}
public string SessionId { get; }
protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
try
{
_callback(message);
}
catch (Exception ex)
{
Logger.Error(...);
}
}
Я могу видеть, что сессия обработчики закрыты и что фабрики расположены, когда запись/чтение находится на нормальном уровне. Однако, как только очередь опустеет, нет никакого способа создания новых обработчиков сеансов. Существует ли политика для выделения идентификаторов сеансов, которая запрещает перераспределение тех же сеансов после периода бездействия?
Edit 1: Я добавляю две фотографии, чтобы проиллюстрировать поведение:
Когда автор останавливается и перезапускается, бегущая читатель не в состоянии читать столько, сколько раньше.
Количество сеансов, созданных после этого момента также значительно ниже, чем раньше:
Спасибо, Брюс, я добавил два изображения к вопросу, чтобы проиллюстрировать поведение. –