2016-11-28 15 views
0

У меня есть EventHub, настроенный в Azure, а также группа потребителей для чтения данных. Он работал нормально в течение нескольких дней. Внезапно я вижу, что задержка входящих данных (около 3 дней). Я использую Windows Service для использования данных на моем сервере. У меня около 500 входящих сообщений в минуту. Может ли кто-нибудь помочь мне понять это?Получение данных из EventHub задерживается

+0

Как вы читаете данные из eventhub? Вы используете экземпляр IEventProcessor? –

+0

@PeterBons Да, я использую экземпляр IEventProcessor. – vishnu

ответ

1

Возможно, вы слишком медленно обрабатываете их. Поэтому работа над этим будет расти, и вы отстанете.

Чтобы получить некоторое представление в том, где вы находитесь в случае поток можно использовать такой код:

private void LogProgressRecord(PartitionContext context) 
{ 
    if (namespaceManager == null) 
     return; 

    var currentSeqNo = context.Lease.SequenceNumber; 
    var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber; 
    var delta = lastSeqNo - currentSeqNo; 

    logWriter.Write(
      $"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})", 
      EventLevel.Informational); 
} 

namespaceManager строится так:

namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz"); 

Я называю этот метод протоколирования в CloseAsync метод:

public Task CloseAsync(PartitionContext context, CloseReason reason) 
{ 
    LogProgressRecord(context); 

    return Task.CompletedTask; 
} 

logWriter лишь некоторые Loggi ng класс, который я использовал для записи информации в хранилище blob.

Теперь она выводит сообщения, как

Последняя переработанного seqnr для раздела 3: 32780931 из 32823804 в consumergroup 'телеметрической' (лаг: 42873)

так, когда отставание очень высок, вы могли бы обрабатывать события, которые произошли давно. В этом случае вам нужно увеличить/увеличить процессор.

Если вы заметили задержку, вы должны измерить, сколько времени требуется для обработки заданного количества предметов. Затем вы можете попытаться оптимизировать производительность и посмотреть, улучшится ли она. Мы сделали это как:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events) 
{ 
     try 
     { 
      stopwatch.Restart(); 

      // process items here 

      stopwatch.Stop(); 

      await CheckPointAsync(context); 

      logWriter.Write(
       $"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.", 
       EventLevel.Informational); 
     } 
} 
+0

Спасибо, Питер за ваше драгоценное время. У меня нет дорогостоящей операции в dataprocessor. Я просто вставляю входящую запись в плоскую таблицу, используя EF. Я только что проверил задержку и это более 100000 для каждого раздела (есть 4 раздела). Можно ли запускать несколько экземпляров службы Windows и составлять отставание? – vishnu

+1

Да, но обратите внимание, что в зависимости от базы данных может быть, что EF/DB просто не может справиться с нагрузкой. 500 сообщений в секунду не так уж много. Вы должны измерить время ваших действий. См. Обновленный ответ. –

+0

Да может быть. Но у меня есть пакетное обновление 25. Может быть, 25 - это небольшое число, я проверю. Между ними существует ли ограничение количества активных слушателей для группы потребителей? Потому что я также планировал сделать еще несколько экземпляров. В VS он бросает фатальную ошибку, если я пытаюсь запустить уже запущенную группу пользователей в течение нескольких минут. – vishnu