Я хочу использовать Reactive Extensions для преобразования некоторых сообщений и их ретрансляции после небольшой задержки.Задержка и дедупликация с использованием реактивных расширений (Rx)
Сообщения выглядеть примерно так:
class InMsg
{
int GroupId { get; set; }
int Delay { get; set; }
string Content { get; set; }
}
Результат выглядит примерно так:
class OutMsg
{
int GroupId { get; set; }
string Content { get; set; }
OutMsg(InMsg in)
{
GroupId = in.GroupId;
Content = Transform(in.Content); // function omitted
}
}
Есть несколько требований:
- Длина задержки зависит от содержимого сообщения.
- У каждого сообщения есть GroupId
- Если новое сообщение приходит с тем же GroupId, что и сообщение с задержкой, ожидающее передачи, тогда первое сообщение следует отбросить, а только второе сообщение будет передано после нового периода задержки.
Учитывая Наблюдаемые <InMsg> и отправляющий функция:
IObservable<InMsg> inMsgs = ...;
void Send(OutMsg o)
{
... // publishes transformed messages
}
Я понимаю, что я могу использовать Select, чтобы выполнить преобразование.
void SetUp()
{
inMsgs.Select(i => new OutMsg(i)).Subscribe(Send);
}
- Как я могу подать сообщение указать медлит? (Обратите внимание, что это может/должно привести к нарушению доставки сообщений.)
- Как я могу обменивать сообщения с тем же GroupId?
- Является ли Rx способным решить эту проблему?
- Есть ли другой способ решения этого?
У меня была игра с этим, и это не совсем так, как я ожидал. Подписка получает «System.Collections.Generic.AnonymousObservable'1 [OutMsg]» – chillitom
Похоже, вы не вызываете 'Switch'.Если вы «наведите указатель мыши» на «Select» в Visual Studio, он должен сказать вам, что он возвращает «IObservable». Если он возвращает 'IObservable >', вы не вызываете Switch –