2016-07-22 8 views
0

У нас есть облачная служба, использующая роль рабочего для обработки сообщений, получаемых из темы, созданной на Azure Service Bus.Активность рабочего ролика Azure перестает обрабатывать сообщение через 60 секунд

Само сообщение кажется неповрежденным и обычно принимается и обрабатывается правильно. Однако в некоторых случаях сообщение, похоже, перестает обрабатываться (журнал заканчивается, и больше никаких ссылок на обрабатываемое сообщение не видно в нашем WadLogsTable). Из моих исследований это может происходить из-за того, что рабочая роль сохраняет свое соединение открытым и простаивающим дольше, чем секунды. Как я могу предотвратить отказ от этих длинных сообщений?

Код для нашей рабочей роли ниже.

public class WorkerRole : RoleEntryPoint 
{ 
    private static StandardKernel _kernel; 
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); 
    private BaseRepository<CallData> _callDataRepository; 
    private BaseRepository<CallLog> _callLogRepository; 

    private SubscriptionClient _client; 
    private NamespaceManager _nManager; 
    private OnMessageOptions _options; 
    private BaseRepository<Site> _siteRepository; 

    public override void Run() 
    { 
     try 
     { 
      List<CallInformation> callInfo; 
      Trace.WriteLine("Starting processing of messages"); 

      // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump. 

      _client.OnMessage(message => 
      { 
       // Process message from subscription. 
       Trace.TraceInformation("Call Received. Ready to process message "); 
       message.RenewLock(); 
       callInfo = message.GetBody<List<CallInformation>>(); 
       writeCallData(callInfo); 


       Trace.TraceInformation("Call Processed. Clearing from topic."); 
      }, _options); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace); 
     } 
    } 

    private void writeCallData(List<CallInformation> callList) 
    { 
     try 
     { 
      Trace.TraceInformation("Calls received: " + callList.Count); 
      foreach (var callInfo in callList) 
      { 
       Trace.TraceInformation("Unwrapping call..."); 
       var call = callInfo.CallLog.Unwrap(); 
       Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints"); 
       Trace.TraceInformation("Inserting Call..."); 
       _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/); 
        Trace.TraceInformation("Call entry written. Now building datapoint list..."); 
        var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList(); 
        Trace.TraceInformation("datapoint list constructed. Processing datapoints..."); 
        foreach (var data in datapoints) 
        { 
         /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */ 
        } 
        Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID); 
       Trace.TraceInformation("Call Processed successfully."); 
      } 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Call Processing Failed. " + e.Message); 
     } 
    } 

    public override bool OnStart() 
    { 
     try 
     { 
      var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
      _nManager = NamespaceManager.CreateFromConnectionString(connectionString); 
      _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0); 
      var topic = new TopicDescription("MyTopic") 
      { 
       DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0), 
       DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0), 
       RequiresDuplicateDetection = true, 
      }; 
      if (!_nManager.TopicExists("MyTopic")) 
      { 
       _nManager.CreateTopic(topic); 
      } 
      if (!_nManager.SubscriptionExists("MyTopic", "AllMessages")) 
      { 
       _nManager.CreateSubscription("MyTopic", "AllMessages"); 
      } 
      _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages", 
       ReceiveMode.ReceiveAndDelete); 
      _options = new OnMessageOptions 
      { 
        AutoRenewTimeout = TimeSpan.FromMinutes(5), 

      }; 
      _options.ExceptionReceived += LogErrors; 
      CreateKernel(); 

      _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace); 
     } 
     return base.OnStart(); 
    } 

    public override void OnStop() 
    { 
     // Close the connection to Service Bus Queue 
     _client.Close(); 
     _completedEvent.Set(); 
    } 

    void LogErrors(object sender, ExceptionReceivedEventArgs e) 
    { 
     if (e.Exception != null) 
     { 
      Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace); 
      _client.Close(); 
     } 
    } 

    public IKernel CreateKernel() 
    { 
     _kernel = new StandardKernel(); 
     /*SNIP: Bind NInjectable repositories */ 
     return _kernel; 
    } 
} 
+0

При выполнении отделки я наблюдаю, что рабочая роль ждет пару секунд, прежде чем снова включить OnStart и снова войти в Run() –

ответ

1

Ответ TheDude очень близок к правильному ответу! Оказывается, он прав, что метод запуска должен остаться в живых, а не сразу возвращаться. Однако с помощью механизма сообщений сообщения Azure Service Bus вы не можете поместить _client.onMessage (...) внутри цикла while, так как это приводит к ошибке (насос сообщений уже инициализирован).

На самом деле должно произойти событие ручной перезагрузки, которое должно быть создано до того, как начнется выполнение рабочей роли, а затем будет ждать выполнения кода насоса сообщения. Для документации по ManualResetEvent см. https://msdn.microsoft.com/en-us/library/system.threading.manualresetevent(v=vs.110).aspx. Кроме того, процесс описан здесь: http://www.acousticguitar.pro/questions/607359/using-queueclient-onmessage-in-an-azure-worker-role

Мой последний класс работника роль выглядит следующим образом:

public class WorkerRole : RoleEntryPoint 
{ 
    private static StandardKernel _kernel; 
    private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false); 
    private BaseRepository<CallLog> _callLogRepository; 

    private SubscriptionClient _client; 
    private MessagingFactory _mFact; 
    private NamespaceManager _nManager; 
    private OnMessageOptions _options; 

    public override void Run() 
    { 
     ManualResetEvent CompletedEvent = new ManualResetEvent(false); 
     try 
     { 
      CallInformation callInfo; 
      // Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump. 
      _client.OnMessage(message => 
      { 
       // Process message from subscription. 
       Trace.TraceInformation("Call Received. Ready to process message " + message.MessageId); 
       callInfo = message.GetBody<CallInformation>(); 
       WriteCallData(callInfo); 

       Trace.TraceInformation("Call Processed. Clearing from topic."); 
      }, _options); 
     } 
     catch (Exception e) 
     { 
      Trace.TraceInformation("Error: " + e.Message + "---" + e.StackTrace); 
     } 
     CompletedEvent.WaitOne(); 
    } 

private void writeCallData(List<CallInformation> callList) 
{ 
    try 
    { 
     Trace.TraceInformation("Calls received: " + callList.Count); 
     foreach (var callInfo in callList) 
     { 
      Trace.TraceInformation("Unwrapping call..."); 
      var call = callInfo.CallLog.Unwrap(); 
      Trace.TraceInformation("Begin Processing: Local Call " + call.ID + " with " + callInfo.DataPoints.Length + " datapoints"); 
      Trace.TraceInformation("Inserting Call..."); 
      _callLogRepository.ExecuteSqlCommand(/*SNIP: Insert call*/); 
       Trace.TraceInformation("Call entry written. Now building datapoint list..."); 
       var datapoints = callInfo.DataPoints.Select(datapoint => datapoint.Unwrap()).ToList(); 
       Trace.TraceInformation("datapoint list constructed. Processing datapoints..."); 
       foreach (var data in datapoints) 
       { 
        /*SNIP: Long running code. Insert our datapoints one at a time. Sometimes our messages die in the middle of this foreach. */ 
       } 
       Trace.TraceInformation("All datapoints written for call with dependable ID " + call.Call_ID); 
      Trace.TraceInformation("Call Processed successfully."); 
     } 
    } 
    catch (Exception e) 
    { 
     Trace.TraceInformation("Call Processing Failed. " + e.Message); 
    } 
} 

public override bool OnStart() 
{ 
    try 
    { 
     var connectionString = CloudConfigurationManager.GetSetting("Microsoft.ServiceBus.ConnectionString"); 
     _nManager = NamespaceManager.CreateFromConnectionString(connectionString); 
     _nManager.Settings.OperationTimeout = new TimeSpan(0,0,10,0); 
     var topic = new TopicDescription("MyTopic") 
     { 
      DuplicateDetectionHistoryTimeWindow = new TimeSpan(0, 0, 10, 0), 
      DefaultMessageTimeToLive = new TimeSpan(0, 0, 10, 0), 
      RequiresDuplicateDetection = true, 
     }; 
     if (!_nManager.TopicExists("MyTopic")) 
     { 
      _nManager.CreateTopic(topic); 
     } 
     if (!_nManager.SubscriptionExists("MyTopic", "AllMessages")) 
     { 
      _nManager.CreateSubscription("MyTopic", "AllMessages"); 
     } 
     _client = SubscriptionClient.CreateFromConnectionString(connectionString, "MyTopic", "AllMessages", 
      ReceiveMode.ReceiveAndDelete); 
     _options = new OnMessageOptions 
     { 
       AutoRenewTimeout = TimeSpan.FromMinutes(5), 

     }; 
     _options.ExceptionReceived += LogErrors; 
     CreateKernel(); 

     _callLogRepository.ExecuteSqlCommand(/*SNIP: Background processing*/); 
    } 
    catch (Exception e) 
    { 
     Trace.TraceInformation("Error on roleStart:" + e.Message + "---" + e.StackTrace); 
    } 
    return base.OnStart(); 
} 

public override void OnStop() 
{ 
    // Close the connection to Service Bus Queue 
    _client.Close(); 
    _completedEvent.Set(); 
} 

void LogErrors(object sender, ExceptionReceivedEventArgs e) 
{ 
    if (e.Exception != null) 
    { 
     Trace.TraceInformation("Error: " + e.Exception.Message + "---" + e.Exception.StackTrace); 
     _client.Close(); 
    } 
} 

public IKernel CreateKernel() 
{ 
    _kernel = new StandardKernel(); 
    /*SNIP: Bind NInjectable repositories */ 
    return _kernel; 
} 

}

Вы заметите присутствие ManualResetEvent и призыванием WaitOne() в конце моего метода Run. Надеюсь, кто-то найдет это полезным!

1

Ваш метод Run не идет на неопределенный срок. Он должен выглядеть следующим образом:

public override void Run() 
{ 
    try 
    { 
     Trace.WriteLine("WorkerRole entrypoint called", "Information"); 
     while (true) 
     { 
     // Add code here that runs in the role instance 
     } 

    } 
    catch (Exception e) 
    { 
     Trace.WriteLine("Exception during Run: " + e.ToString()); 
     // Take other action as needed. 
    } 
} 

Взятый из docs:

Выполнить считаются Основным методом для вашего приложения. Переопределение Метод Run не требуется; реализация по умолчанию никогда не возвращается . Если вы переопределите метод «Выполнить», ваш код должен блокировать на неопределенный срок. Если метод Run возвращается, то роль автоматически возвращается в , поднимая событие остановки и вызывая метод OnStop , чтобы ваши последовательности выключения могли быть выполнены до того, как роль отключена.

+0

Эй, спасибо за этот ответ. Мне очень понравилось то, что нужно было. Определенно репутация достойна, но я объясню в своем ответе. –