2016-05-06 12 views
0

В настоящее время я работаю над Internet Of Things, в моем текущем проекте я был создан One Azure Cloud Service Project, в котором я создал роль рабочего, внутри роль рабочего, которую я написал ниже строк кода.Когда будет запущен метод ProcessEventsAsync (контекст PartitionContext, ienumerable <EventData>)

public class WorkerRole : RoleEntryPoint 
{ 
    private readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource(); 
    private readonly ManualResetEvent runCompleteEvent = new ManualResetEvent(false); 

    private static string connectionString; 
    private static string eventHubName; 
    public static ServiceClient iotHubServiceClient { get; private set; } 
    public static EventHubClient eventHubClient { get; private set; } 

    public override void Run() 
    { 
     Trace.TraceInformation("EventsForwarding Run()...\n"); 

     try 
     { 
      this.RunAsync(this.cancellationTokenSource.Token).Wait(); 
     } 
     finally 
     { 
      this.runCompleteEvent.Set(); 
     } 
    } 

    public override bool OnStart() 
    { 
     // Set the maximum number of concurrent connections 
     ServicePointManager.DefaultConnectionLimit = 12; 

     // For information on handling configuration changes 
     // see the MSDN topic at http://go.microsoft.com/fwlink/?LinkId=166357. 

     bool result = base.OnStart(); 

     Trace.TraceInformation("EventsForwarding OnStart()...\n"); 

     connectionString = ConfigurationManager.AppSettings["Microsoft.ServiceBus.ConnectionString"]; 
     eventHubName = ConfigurationManager.AppSettings["Microsoft.ServiceBus.EventHubName"]; 

     string storageAccountName = ConfigurationManager.AppSettings["AzureStorage.AccountName"]; 
     string storageAccountKey = ConfigurationManager.AppSettings["AzureStorage.Key"]; 
     string storageAccountString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", 
      storageAccountName, storageAccountKey); 

     string iotHubConnectionString = ConfigurationManager.AppSettings["AzureIoTHub.ConnectionString"]; 
     iotHubServiceClient = ServiceClient.CreateFromConnectionString(iotHubConnectionString); 
     eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName); 

     var defaultConsumerGroup = eventHubClient.GetDefaultConsumerGroup(); 

     string eventProcessorHostName = "SensorEventProcessor"; 
     EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, defaultConsumerGroup.GroupName, connectionString, storageAccountString); 
     eventProcessorHost.RegisterEventProcessorAsync<SensorEventProcessor>().Wait(); 

     Trace.TraceInformation("Receiving events...\n"); 

     return result; 
    } 

    public override void OnStop() 
    { 
     Trace.TraceInformation("EventsForwarding is OnStop()..."); 

     this.cancellationTokenSource.Cancel(); 
     this.runCompleteEvent.WaitOne(); 

     base.OnStop(); 

     Trace.TraceInformation("EventsForwarding has stopped"); 
    } 

    private async Task RunAsync(CancellationToken cancellationToken) 
    { 
     while (!cancellationToken.IsCancellationRequested) 
     { 
      //Trace.TraceInformation("EventsToCommmandsService running...\n"); 
      await Task.Delay(1000); 

     } 
    } 
} 

Следующая я написал ниже строки кода в SensorEventProcessor, для приема сообщений от хаба событий и отправить эти сообщения IoT ступице.

class SensorEventProcessor : IEventProcessor 
{ 
    Stopwatch checkpointStopWatch; 
    PartitionContext partitionContext; 

    public async Task CloseAsync(PartitionContext context, CloseReason reason) 
    { 
     Trace.TraceInformation(string.Format("EventProcessor Shuting Down. Partition '{0}', Reason: '{1}'.", this.partitionContext.Lease.PartitionId, reason.ToString())); 
     if (reason == CloseReason.Shutdown) 
     { 
      await context.CheckpointAsync(); 
     } 
    } 

    public Task OpenAsync(PartitionContext context) 
    { 
     Trace.TraceInformation(string.Format("Initializing EventProcessor: Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset)); 
     this.partitionContext = context; 
     this.checkpointStopWatch = new Stopwatch(); 
     this.checkpointStopWatch.Start(); 
     return Task.FromResult<object>(null); 
    } 

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) 
    { 
     Trace.TraceInformation("\n"); 
     Trace.TraceInformation("........ProcessEventsAsync........"); 
     //string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}"; 
     //await WorkerRole.iotHubServiceClient.SendAsync("astranidevice", new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew))); 
     foreach (EventData eventData in messages) 
     { 
      try 
      { 
       string jsonString = Encoding.UTF8.GetString(eventData.GetBytes()); 

       Trace.TraceInformation(string.Format("Message received at '{0}'. Partition: '{1}'", 
        eventData.EnqueuedTimeUtc.ToLocalTime(), this.partitionContext.Lease.PartitionId)); 

       Trace.TraceInformation(string.Format("-->Raw Data: '{0}'", jsonString)); 

       SimpleTemperatureAlertData newSensorEvent = this.DeserializeEventData(jsonString); 

       Trace.TraceInformation(string.Format("-->Serialized Data: '{0}', '{1}', '{2}', '{3}', '{4}'", 
        newSensorEvent.Time, newSensorEvent.RoomTemp, newSensorEvent.RoomPressure, newSensorEvent.RoomAlt, newSensorEvent.DeviceId)); 

       // Issuing alarm to device. 
       string commandParameterNew = "{\"Name\":\"AlarmThreshold\",\"Parameters\":{\"SensorId\":\"" + "Hello World" + "\"}}"; 
       Trace.TraceInformation("Issuing alarm to device: '{0}', from sensor: '{1}'", newSensorEvent.DeviceId, newSensorEvent.RoomTemp); 
       Trace.TraceInformation("New Command Parameter: '{0}'", commandParameterNew); 
       await WorkerRole.iotHubServiceClient.SendAsync(newSensorEvent.DeviceId, new Microsoft.Azure.Devices.Message(Encoding.UTF8.GetBytes(commandParameterNew))); 
      } 
      catch (Exception ex) 
      { 
       Trace.TraceInformation("Error in ProssEventsAsync -- {0}\n", ex.Message); 
      } 
     } 

     await context.CheckpointAsync(); 
    } 
    private SimpleTemperatureAlertData DeserializeEventData(string eventDataString) 
    { 
     return JsonConvert.DeserializeObject<SimpleTemperatureAlertData>(eventDataString); 
    } 

} 

Когда я отлаживать код, то ProcessEventsAsync (PartitionContext контекст, IEnumerable сообщения) метод никогда не будет звонить и просто ввести в OpenAsync метод(), затем itstop отладки.

Скажите, пожалуйста, где я ошибся в своем проекте и скажу, когда вызовет метод ProcessEventsAsync().

С уважением,

Прадипом

ответ

1

IEventProcessor.ProcessEventsAsync вызывается, когда есть какие-либо необработанные сообщения в EventHub.

Концентратор событий содержит несколько разделов. Раздел - упорядоченная последовательность событий. Внутри раздела каждое событие включает смещение. Это смещение используется потребителями (IEventProcessor) для отображения местоположения в последовательности событий для данного раздела. Когда IEventProcessor подключается (EventProcessorHost.RegisterEventProcessorAsync), он передает это смещение в Event Hub, чтобы указать местоположение, с которого начнется чтение. Когда есть необработанные сообщения (события с более высоким смещением), они доставляются в IEventProcessor. Контрольная точка используется для сохранения смещения обработанных сообщений (PartitionContext.CheckpointAsync).

Вы можете найти подробную информацию о внутренностях EventHub: Azure Event Hubs overview

Вы посланных сообщений в EventHub (EventHubClient.SendAsync (данныеСобытия))?

+0

Да, но не используется метод SendAsync() для отправки сообщений в концентратор событий. в моем требовании я был назначен выходным заданием в качестве центра событий в потоковой аналитике, аналитика потока отправит значения в концентратор событий в этом я использовал вход в качестве концентратора IoT. – pradeep

+0

Не могли бы вы подтвердить через Azure Portal сообщения, полученные недавно EventHub? –

+0

Я думаю, что он получен, но на моей лазурной информационной панели портала не отображаются сообщения о хабе события. – pradeep

 Смежные вопросы

  • Нет связанных вопросов^_^