2016-07-07 1 views
2

Я пытаюсь ставить в очередь ряд задач и запускать их асинхронно с использованием Azure Service Fabric. В настоящее время я использую CloudMessageQueue с рабочими ролями. Я пытаюсь перейти на Service Fabric. Из ролей рабочих, вот мой код:Azure Service Fabric Message Queue

private void ExecuteTask() 
    { 
     CloudQueueMessage message = null; 

     if (queue == null) 
     { 
      jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting."))); 
      return; 
     } 

     try 
     { 
      message = queue.GetMessage(); 
      if (message != null) 
      { 
       JMATask task = GetTask(message.AsString); 
       string msg = (message == null) ? string.Empty : message.AsString; 
       //jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg))); 
       queue.DeleteMessage(message); 
       PerformTask(task); 
      } 
     } 
     catch (Exception ex) 
     { 
      string msg = (message == null) ? string.Empty : message.AsString; 
      jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString()))); 
     } 
    } 

У меня есть несколько вопросов:

  1. Как запустить преформы метод задачи асинхронно? Я хочу одновременно запускать около 30-40 задач.
  2. У меня есть список JMATask. Как добавить список в очередь?
  3. Должен ли список быть добавлен в очередь?

    namespace Stateful1 
    { 
        public class JMATask 
        { 
        public string Name { get; set; } 
        } 
    
    /// <summary> 
    /// An instance of this class is created for each service replica by the Service Fabric runtime. 
    /// </summary> 
    internal sealed class Stateful1 : StatefulService 
    { 
    public Stateful1(StatefulServiceContext context) 
        : base(context) 
    { } 
    
    /// <summary> 
    /// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests. 
    /// </summary> 
    /// <remarks> 
    /// For more information on service communication, see http://aka.ms/servicefabricservicecommunication 
    /// </remarks> 
    /// <returns>A collection of listeners.</returns> 
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners() 
    { 
        return new ServiceReplicaListener[0]; 
    } 
    
    /// <summary> 
    /// This is the main entry point for your service replica. 
    /// This method executes when this replica of your service becomes primary and has write status. 
    /// </summary> 
    /// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param> 
    protected override async Task RunAsync(CancellationToken cancellationToken) 
    { 
        // TODO: Replace the following sample code with your own logic 
        //  or remove this RunAsync override if it's not needed in your service. 
    
        IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks"); 
        //var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary"); 
    
        while (true) 
        { 
         cancellationToken.ThrowIfCancellationRequested(); 
    
         using (var tx = this.StateManager.CreateTransaction()) 
         { 
          var result = await tasks.TryDequeueAsync(tx); 
    
          //how do I execute this method async? 
          PerformTask(result.Value); 
    
          //Create list of JMA Tasks to queue up 
          await tasks.EnqueueAsync(tx, new JMATask()); 
    
          //var result = await myDictionary.TryGetValueAsync(tx, "Counter"); 
    
          //ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}", 
          // result.HasValue ? result.Value.ToString() : "Value does not exist."); 
    
          //await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value); 
    
          // If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are 
          // discarded, and nothing is saved to the secondary replicas. 
          await tx.CommitAsync(); 
         } 
    
         await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken); 
        } 
    } 
    
    private async void PerformTask(JMATask task) 
    { 
        //execute task 
    } 
    

    }

+0

Борясь с подобной проблемой, и это было полезно, спасибо. Любопытно, в чем вы оказались и как это сработало для вас. – kenchilada

+1

Я использовал другую технологию: https://msdn.microsoft.com/en-us/library/dn568104.aspx –

ответ

1

RunAsync не должен иметь эту строку коды: await tasks.EnqueueAsync(tx, new JMATask());

Создать список ИХ задач в очередь должна быть другим способом в вашем отслеживании состояния сервиса, который выглядит следующим образом:

public async Task AddJMATaskAsync(JMATask task) 
    { 
     var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks"); 
     using (var tx = StateManager.CreateTransaction()) 
     { 
      try 
      { 
       await tasksQueue.EnqueueAsync(tx, request); 
       await tx.CommitAsync(); 
      } 
      catch (Exception ex) 
      { 
       tx.Abort(); 
      } 
     } 
    } 

и тогда ваши метод PerformTask может содержать вызов без гражданства microservice:

public async Task PerformTask (JMATask task) 
    { 
     //1. resolve stateless microservice URI 
     // statelessSvc 

     //2. call method of the stateless microservice 
     // statelessSvc.PerformTask(task); 
    } 

Так в основном, с сохранением состояния службы будет только queu e и удалить задания. Выполнение фактической задачи может выполняться с помощью микросервиса, который будет доступен для всех узлов в кластере.

1

Вы можете создать список задач, а затем сделать ждут Task.WhenAll (TaskList);

Это, вероятно, самый простой прямой ответ.

Однако, если каждая задача несколько отличается, считали ли вы создание индивидуальных микросервисов для каждой из задач?

метод
+0

Да, каждая задача отличается. Как создать отдельные микросервисы? –

+0

В идеале создайте одну службу (состояние, без состояния) для каждой задачи. Трудно ответить точно без глубокого понимания, но это будет моя реакция на кишок. Каждая служба должна иметь простую единую ответственность и быть действительно хороша в том, что она должна делать. –

+0

Я обновил свой пост. Задачи перемещают данные из одной системы в другую. Существует несколько сотен различных задач, но многие из них выполняют один и тот же тип действия. –

 Смежные вопросы

  • Нет связанных вопросов^_^