2016-08-22 6 views
1

Следующий код подключается к концентратору событий Azure, он выполняет итерацию по всем разделам, а затем считывает сообщения, которые должны обрабатываться и вставляться в базу данных (что нужно сделать) , код работает отлично, однако каждый раз, когда ИТ ЧИТАЕТ ВСЕ СООБЩЕНИЯ.Улучшение кода для чтения сообщений из концентратора событий, в зависимости от даты последнего сообщения

Это будет установлено как Azure WebJob, поэтому оно будет работать непрерывно, в режиме реального времени, без остановки.

  1. Как я могу улучшить этот код, чтобы читать только необработанные сообщения?
  2. Есть ли лучший способ кодировать секцию while/for, вы бы сделали это по-другому?

    static void Main(string[] args) 
    { 
        ServiceBusConnectionStringBuilder builder = new ServiceBusConnectionStringBuilder(ConfigurationManager.AppSettings["ConnectionString"].ToString()); 
        builder.TransportType = TransportType.Amqp; 
        MessagingFactory factory = MessagingFactory.CreateFromConnectionString(ConfigurationManager.AppSettings["ConnectionString"].ToString()); 
        EventHubClient client = factory.CreateEventHubClient(ConfigurationManager.AppSettings["eventHubEntity"].ToString()); 
        EventHubConsumerGroup group = client.GetDefaultConsumerGroup(); 
    
        CancellationTokenSource cts = new CancellationTokenSource(); 
        System.Console.CancelKeyPress += (s, e) => 
        { 
         e.Cancel = true; 
         cts.Cancel(); 
         Console.WriteLine("Exiting..."); 
        }; 
        var d2cPartitions = client.GetRuntimeInformation().PartitionIds; 
    
        while (true) 
        { 
         foreach (string partition in d2cPartitions) 
         { 
          EventHubReceiver receiver = group.CreateReceiver(partition, DateTime.MinValue); 
          EventData data = receiver.Receive(); 
          Console.WriteLine("{0} {1} {2}", data.PartitionKey, data.EnqueuedTimeUtc.ToLocalTime(), Encoding.UTF8.GetString(data.GetBytes())); 
          var dateLastMessage = data.EnqueuedTimeUtc.ToLocalTime(); 
          receiver.Close(); 
          client.Close(); 
          factory.Close(); 
         } 
        } 
    } 
    
+0

что не так с кодом в настоящее время @gina? также для объектов-получателей, клиентов и фабричных объектов, вы также убедитесь, что их распоряжаетесь, возможно, это лучше всего подходит для проверки кода. – MethodMan

+0

проблема в том, что он всегда считывает все сообщения из концентратора событий, метод создания приемника имеет параметр смещения даты и времени, она спрашивает, есть ли способ читать только сообщения, которые еще не обработаны. –

ответ

1

Использование EventHubReceiver не дает вам контроль вам нужно. Вместо этого вы должны использовать EventProcessorHost, который позволяет использовать контрольные точки, которые можно использовать для возобновления обработки сообщений.

См. http://blogs.biztalk360.com/understanding-consumer-side-of-azure-event-hubs-checkpoint-initialoffset-eventprocessorhost/ и https://blogs.msdn.microsoft.com/servicebus/2015/01/16/event-processor-host-best-practices-part-1/ для чтения фона.

См. https://azure.microsoft.com/en-us/documentation/articles/event-hubs-csharp-ephcs-getstarted/#receive-messages-with-eventprocessorhost для учебника.

Вы можете легко разместить EventProcessor в WebJob.