У меня есть EventHub, настроенный в Azure, а также группа потребителей для чтения данных. Он работал нормально в течение нескольких дней. Внезапно я вижу, что задержка входящих данных (около 3 дней). Я использую Windows Service для использования данных на моем сервере. У меня около 500 входящих сообщений в минуту. Может ли кто-нибудь помочь мне понять это?Получение данных из EventHub задерживается
ответ
Возможно, вы слишком медленно обрабатываете их. Поэтому работа над этим будет расти, и вы отстанете.
Чтобы получить некоторое представление в том, где вы находитесь в случае поток можно использовать такой код:
private void LogProgressRecord(PartitionContext context)
{
if (namespaceManager == null)
return;
var currentSeqNo = context.Lease.SequenceNumber;
var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
var delta = lastSeqNo - currentSeqNo;
logWriter.Write(
$"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
EventLevel.Informational);
}
namespaceManager строится так:
namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");
Я называю этот метод протоколирования в CloseAsync
метод:
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
LogProgressRecord(context);
return Task.CompletedTask;
}
logWriter
лишь некоторые Loggi ng класс, который я использовал для записи информации в хранилище blob.
Теперь она выводит сообщения, как
Последняя переработанного seqnr для раздела 3: 32780931 из 32823804 в consumergroup 'телеметрической' (лаг: 42873)
так, когда отставание очень высок, вы могли бы обрабатывать события, которые произошли давно. В этом случае вам нужно увеличить/увеличить процессор.
Если вы заметили задержку, вы должны измерить, сколько времени требуется для обработки заданного количества предметов. Затем вы можете попытаться оптимизировать производительность и посмотреть, улучшится ли она. Мы сделали это как:
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
try
{
stopwatch.Restart();
// process items here
stopwatch.Stop();
await CheckPointAsync(context);
logWriter.Write(
$"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
EventLevel.Informational);
}
}
Спасибо, Питер за ваше драгоценное время. У меня нет дорогостоящей операции в dataprocessor. Я просто вставляю входящую запись в плоскую таблицу, используя EF. Я только что проверил задержку и это более 100000 для каждого раздела (есть 4 раздела). Можно ли запускать несколько экземпляров службы Windows и составлять отставание? – vishnu
Да, но обратите внимание, что в зависимости от базы данных может быть, что EF/DB просто не может справиться с нагрузкой. 500 сообщений в секунду не так уж много. Вы должны измерить время ваших действий. См. Обновленный ответ. –
Да может быть. Но у меня есть пакетное обновление 25. Может быть, 25 - это небольшое число, я проверю. Между ними существует ли ограничение количества активных слушателей для группы потребителей? Потому что я также планировал сделать еще несколько экземпляров. В VS он бросает фатальную ошибку, если я пытаюсь запустить уже запущенную группу пользователей в течение нескольких минут. – vishnu
Как вы читаете данные из eventhub? Вы используете экземпляр IEventProcessor? –
@PeterBons Да, я использую экземпляр IEventProcessor. – vishnu