2011-01-19 4 views
7

Я хочу использовать Reactive Extensions для преобразования некоторых сообщений и их ретрансляции после небольшой задержки.Задержка и дедупликация с использованием реактивных расширений (Rx)

Сообщения выглядеть примерно так:

class InMsg 
{ 
    int GroupId { get; set; } 
    int Delay { get; set; } 
    string Content { get; set; } 
} 

Результат выглядит примерно так:

class OutMsg 
{ 
    int GroupId { get; set; } 
    string Content { get; set; } 
    OutMsg(InMsg in) 
    { 
     GroupId = in.GroupId; 
     Content = Transform(in.Content); // function omitted 
    } 
} 

Есть несколько требований:

  • Длина задержки зависит от содержимого сообщения.
  • У каждого сообщения есть GroupId
  • Если новое сообщение приходит с тем же GroupId, что и сообщение с задержкой, ожидающее передачи, тогда первое сообщение следует отбросить, а только второе сообщение будет передано после нового периода задержки.

Учитывая Наблюдаемые <InMsg> и отправляющий функция:

IObservable<InMsg> inMsgs = ...; 

void Send(OutMsg o) 
{ 
    ... // publishes transformed messages 
} 

Я понимаю, что я могу использовать Select, чтобы выполнить преобразование.

void SetUp() 
{ 
    inMsgs.Select(i => new OutMsg(i)).Subscribe(Send); 
} 
  • Как я могу подать сообщение указать медлит? (Обратите внимание, что это может/должно привести к нарушению доставки сообщений.)
  • Как я могу обменивать сообщения с тем же GroupId?
  • Является ли Rx способным решить эту проблему?
  • Есть ли другой способ решения этого?

ответ

7

Вы можете использовать GroupBy сделать IGroupedObservable, Delay задержать выход, и Switch, чтобы убедиться, что новые значения заменяют предыдущие значения в своей группе:

IObservable<InMsg> inMessages; 

inMessages 
    .GroupBy(msg => msg.GroupId) 
    .Select(group => 
     { 
      return group.Select(groupMsg => 
       { 
        TimeSpan delay = TimeSpan.FromMilliseconds(groupMsg.Delay); 
        OutMsg outMsg = new OutMsg(); // map InMsg -> OutMsg here 

        return Observable.Return(outMsg).Delay(delay); 
       }) 
       .Switch(); 
     }) 
     .Subscribe(outMsg => Console.Write("OutMsg received")); 

записка о выполнении: если сгруппированное значение получено после сообщение отправляется (т. е. после задержки), оно начнет новую задержку

+0

У меня была игра с этим, и это не совсем так, как я ожидал. Подписка получает «System.Collections.Generic.AnonymousObservable'1 [OutMsg]» – chillitom

+0

Похоже, вы не вызываете 'Switch'.Если вы «наведите указатель мыши» на «Select» в Visual Studio, он должен сказать вам, что он возвращает «IObservable ». Если он возвращает 'IObservable >', вы не вызываете Switch –

0

Рамка Rx решает задержку с использованием the Delay. Очередь с дедупликацией может быть решена путем применения регулярной сортировки LINQ после Delay, а затем выполнения DistinctUntilChanged.

Обновление: Я поддерживаю, подход задержки здесь не будет работать один. Вам как-то нужно помещать входящие сообщения в очередь за задержку. Это достигается с помощью метода расширения BufferWithTime. Этот метод будет возвращать списки сообщений, которые вы затем можете очистить для дубликатов, прежде чем публиковать их в следующем наблюдателе в очереди.