2016-10-17 5 views
2

У меня есть работа по обработке данных, которая состоит из примерно 20 последовательных шагов. Шаги все подпадают под одну из трех категорий:C# TPL: Возможно перезапустить неудавшийся трубопровод на произвольном шаге?

  1. сделать некоторые файл манипуляции
  2. импорт/экспорт данных из базы данных
  3. сделать вызов на третьей стороной веб-API

I» ve реорганизовал код из одного длинного, ужасного метода в шаблон трубопровода, используя примеры here и here. Все шаги являются TransformBlock, такие как

var stepThirteenPostToWebApi = new TransformBlock<FileInfo, System.Guid>(async csv => 
{ 
dynamic task = await ApiUtils.SubmitData(csv.FullName); 
return task.guid; 
}); 

Код работает большую часть времени, но иногда шаг в трубопроводе не может по каким-либо причинам - скажем, поврежденный файл не может быть прочитан на шаге 6 20 (просто пример - любой шаг может завершиться неудачей). Конвейер прекращает выполнение дополнительных задач, как и следовало ожидать.

Однако сторонний веб-API представляет собой сложную задачу - мы взимаем плату за каждое задание, которое мы инициируем, выполним ли мы все 20 шагов или только первый.

Я хотел был бы исправить все, что пошло не так на шаге проблемы (опять же, для нашего примера предположим, что я исправляю поврежденный файл на шаге 6 из 20), затем заберите резервную копию на шаге 6. Третья сторона веб-API имеет GUID для каждого задания и является асинхронным, так что это должно быть хорошо - после устранения проблемы он с удовольствием возобновит работу с оставшимися шагами.

Мой вопрос: Возможно ли (и если это целесообразно?) Разработать конвейер, который может начинаться с любого шага, если предпосылки для этого шага действительны?

Это будет выглядеть примерно так:

  1. работа терпит неудачу на этапе 6 и журналы шаг 5 как последний успешный шаг
  2. человек приходит и исправляет любые вызванные шаг 6 на провал
  3. в новый трубопровод начинается на этапе 6

Я понимаю, грубой силы способ будет иметь StartAtStep2(), StartAtStep3(), StartAtStep4() методов. Это не похоже на хороший дизайн, но я немного новичок в этом шаблоне, поэтому, возможно, это приемлемо.

+0

перебор путь не так уж плохо, например, ваш выше код просто необходимо будет 'BOOL StartAtStepThirteen (FileInfo информация) {вернуться stepThirteenPostToWebApi.Post (информация); } ' –

+0

Я не понимаю, как это будет выглядеть ... мне не нужно будет по существу дублировать весь код с шагов N до конца в каждом методе? Если так, что кажется проблематичным для поддержания. – GojiraDeMonstah

+0

Настройка цепочки должна быть отдельным способом, чем выполнение цепочки. Вы должны сохранить 'stepThirteenPostToWebApi' в переменной уровня класса в классе, представляющем всю цепочку (настройка цепочки может быть выполнена в конструкторе класса). –

ответ

3

перебора путь не так уж плохо, например, ваш выше код просто необходим будет

bool StartAtStepThirteen(FileInfo csv) 
{ 
    return stepThirteenPostToWebApi.Post(csv); 
} 

Настройка цепи должна быть отдельным методом, чем выполнение цепочки. Вы должны сохранить stepThirteenPostToWebApi в переменной уровня класса в классе, представляющем всю цепочку, настройку цепочки можно выполнить в конструкторе класса.

Вот простая трехэтапная версия процесса. При возникновении ошибки вместо сбоя в цепочке задач я регистрирую ошибку и передаю null по цепочке для недопустимых записей. Вы можете заставить этот метод журнала вывести событие, а затем пользователь может решить, что делать с плохой записью.

public class WorkChain 
{ 
    private readonly TransformBlock<string, FileInfo> stepOneGetFileInfo; 
    private readonly TransformBlock<FileInfo, System.Guid?> stepTwoPostToWebApi; 
    private readonly ActionBlock<System.Guid?> stepThreeDisplayIdToUser; 

    public WorkChain() 
    { 
     stepOneGetFileInfo = new TransformBlock<string, FileInfo>(new Func<string, FileInfo>(GetFileInfo)); 
     stepTwoPostToWebApi = new TransformBlock<FileInfo, System.Guid?>(new Func<FileInfo, Task<Guid?>>(PostToWebApi)); 
     stepThreeDisplayIdToUser = new ActionBlock<System.Guid?>(new Action<Guid?>(DisplayIdToUser)); 

     stepOneGetFileInfo.LinkTo(stepTwoPostToWebApi, new DataflowLinkOptions() {PropagateCompletion = true}); 
     stepTwoPostToWebApi.LinkTo(stepThreeDisplayIdToUser, new DataflowLinkOptions() {PropagateCompletion = true}); 
    } 

    public void PostToStepOne(string path) 
    { 
     bool result = stepOneGetFileInfo.Post(path); 
     if (!result) 
     { 
      throw new InvalidOperationException("Failed to post to stepOneGetFileInfo"); 
     } 
    } 

    public void PostToStepTwo(FileInfo csv) 
    { 
     bool result = stepTwoPostToWebApi.Post(csv); 
     if (!result) 
     { 
      throw new InvalidOperationException("Failed to post to stepTwoPostToWebApi"); 
     } 
    } 

    public void PostToStepThree(Guid id) 
    { 
     bool result = stepThreeDisplayIdToUser.Post(id); 
     if (!result) 
     { 
      throw new InvalidOperationException("Failed to post to stepThreeDisplayIdToUser"); 
     } 
    } 

    public void CompleteAdding() 
    { 
     stepOneGetFileInfo.Complete(); 
    } 

    public Task Completion { get { return stepThreeDisplayIdToUser.Completion; } } 


    private FileInfo GetFileInfo(string path) 
    { 
     try 
     { 
      return new FileInfo(path); 
     } 
     catch (Exception ex) 
     { 
      LogGetFileInfoError(ex, path); 
      return null; 
     } 

    } 

    private async Task<Guid?> PostToWebApi(FileInfo csv) 
    { 
     if (csv == null) 
      return null; 
     try 
     { 
      dynamic task = await ApiUtils.SubmitData(csv.FullName); 
      return task.guid; 
     } 
     catch (Exception ex) 
     { 
      LogPostToWebApiError(ex, csv); 
      return null; 
     } 
    } 

    private void DisplayIdToUser(Guid? obj) 
    { 
     if(obj == null) 
      return; 

     Console.WriteLine(obj.Value); 
    } 

}