2017-01-20 5 views
1

Я создаю интеграционный тест, в котором я использую InMemNetwork для запуска теста.Rebus - Запустите модульные тесты и дождитесь, пока Rebus завершит все потоки

Существует вызов Thread.Sleep непосредственно перед утверждением, но это извращенный способ тестирования, и это замедляет наши тесты.

Я также выполняю некоторые интеграционные тесты, используя SagaFixtures и простую реализацию IBus, которая работает синхронно, но все это немного утомительно с регистрацией обработчиков, запуском обработчиков и отсрочкой сообщений.

Есть ли способ подождать на всех потоках, используемых Ребусом, до тех пор, пока они не закончатся, не увеличивая производственный код, используя такие вещи, как ManualResetEvent (используется в собственных тестах Rebus)?

ответ

0

Как обычно, я использую SagaFixture, а затем я использую FakeBus, чтобы вводить в обработчики саги, чтобы захватить их действия.

Большинство моих тестов - это модульные тесты простых обработчиков, но я часто буду вводить «настоящую» услугу, например, например. реализация IThis и IThat, которые поступают в настоящую базу данных.

Для нескольких сценариев, хотя я разворачиваю несколько конечных точек с транспортом in-mem, а затем я обычно реализую расширение на InMemNetwork, что помогает мне ждать публикации определенных событий или что-то в этом роде - это может выглядеть это в тесте:

var updated = await Network.WaitForNext<WhateverUpdated>(subscriberAddress, timeoutSeconds: 20); 

где WaitForNext просто метод расширения, который опрашивает очередь определяется subscriberAddress для следующего сообщения и пытается десериализация как WhateverUpdated.

Я надеюсь, что может дать вам некоторое вдохновение :)

+0

Создание метода расширения для этого было бы идеальным решением. Большое спасибо! –

0

Для некоторых сценариев х я использовать следующий подход ждать Ребус, чтобы завершить всю обработку сообщений. Конечные точки ребуса размещаются в отдельных exe-файлах, а транспорт для файловой системы ребуса используется для тестов интеграции (обычно это Azure SB). Интеграционный тест объединяет exe и в каждом exe. Ребус настроен на 0 работников, поэтому он ничего не делает. Затем в тесте у нас есть метод WaitForMessagesProcessed(), который настраивает число рабочих и блоков до тех пор, пока не будет обработано больше сообщений.

Вот как это примерно выглядит в коде:

public class MessageProcessor() { 
    private string queueName; 
    private int messagesWaitingForProcessing; 
    private IBus bus; 

    public MessageProcessor(string queueName) { 
     this.queueName = queueName; 

     this.bus = Configure.With(adapter) 
     .Transport(t => t.UseFileSystem(@"c:\baseDirectory", this.queueName)) 
     .Options(o => 
     { 
      o.SetNumberOfWorkers(0); 
     }) 
     .Events(e => 
     { 
      e.BeforeMessageSent += (thebus, headers, message, context) => 
      { 
       // When sending to itself, the message is not queued on the network. 
       var m = context.Load<Rebus.Pipeline.Send.DestinationAddresses>(); 
       if (m.Any(t => t == this.queueName)) 
        this.messagesWaitingForProcessing++; 
      }; 

      e.AfterMessageHandled += (thebus, headers, message, context, args) => 
      { 
       this.messagesWaitingForProcessing--; 
      }; 
     }) 
     .Start(); 
    } 

    public async Task WaitForMessagesProcessed() 
    { 
     this.DetermineMessagesWaitingForProcessing(); 
     while (this.messagesWaitingForProcessing > 0) 
     { 
      this.bus.Advanced.Workers.SetNumberOfWorkers(2); 

      while (this.messagesWaitingForProcessing > 0) 
      { 
       await Task.Delay(100); 
      } 

      this.bus.Advanced.Workers.SetNumberOfWorkers(0); 

      this.DetermineMessagesWaitingForProcessing(); 
     } 
    } 

    public void DetermineMessagesWaitingForProcessing() { 
     this.messagesWaitingForProcessing = Directory.GetFiles(GetDirectoryForQueueNamed(this.queueName),  "*.rebusmessage.json").Count(); 
    } 

    private static string GetDirectoryForQueueNamed(string queueName) 
    { 
     return Path.Combine(this.baseDiretory, queueName); 
    } 
} 

Испытание может быть как

[TestMethod] 
public void Test() { 
    var endpoint1 = LaunchExe("1"); 
    var endpoint2 = LaunchExe("2"); 

    endPoint1.DoSomeAction(); 

    endPoint1.WaitForMessagesProcessed(); 

    Assert.AreEqual("expectation", endPoint1.Query()); 
}