2016-10-27 5 views
2

Я использую RX 2.2.5. У меня есть 2 мнения, которые загружаются суб заказов сRx.Net GroupBy реализация отсутствует последовательность элементов редко

  _transportService 
      .ObserveSubOrder(parentOrder.OrderId) 
      .SubscribeOn(_backgroundScheduler) 
      .ObserveOn(_uiScheduler) 
      .Where(subOs => subOs != null)     
      .Snoop("BeforeGrpBy") 
      .GroupBy(subOs => subOs.OrderId) 
      .Subscribe(subOrdUpdates => 
      { 
       AddIfNew(subOrdUpdates.Key, subOrdUpdates.Snoop("AfterGrpBy" + "--" + subOrdUpdates.Key));       
      }) 

Перед группеГо он получит всю последовательность элементов, проблема возникает после того, как GroupBy, что он пропускает очень редко последовательность элементов. Я не думаю, что это проблема параллелизма, как видно из журналов. Пользовательский метод расширения Snoop используется для генерации этих журналов. Время

16:15:44.8169968 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125}) 
16:15:44.8169968 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained 
16:15:44.8369988 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on. 
16:15:44.8379989 : (1) : BeforeGrpBy: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125}) 
16:15:44.8379989 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed. 
16:15:44.8590010 : (1) : AfterGrpBy--9Zsj8Z4sTRb: Observable obtained 
16:15:44.8600011 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscribed to on. 
16:15:44.8610012 : (9) : AfterGrpBy--9Zsj8Z4sTRb: Subscription completed. 
16:15:44.8620013 : (1) : AfterGrpBy--9Zsj8Z4sTRb: OnNext({ OrderId = 9Zsj8Z4sTRb, OrderType = WNX6, Quantity = 10, Price = 178.78125}) 

Формат: (Thread): Msg

Как вы можете видеть, прежде чем GroupBy onNext вызывается дважды, но после того, как он пропустил один. Что-то не так с грамматикой Rx здесь или это известная проблема? Любое понимание поможет? Если какое-либо дополнительное разъяснение требуется, пожалуйста, прокомментируйте.

Update: Добавление работы/желательные журналы:

16:15:45.1070258 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 
16:15:45.1280279 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained 
16:15:45.1310282 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on. 
16:15:45.1320283 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed. 
16:15:45.1320283 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 
16:15:45.1330284 : (1) : BeforeGrpBy: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 
16:15:45.1330284 : (1) : AfterGrpBy--44Fqp3ubNmL: Observable obtained 
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscribed to on. 
16:15:45.1340285 : (10) : AfterGrpBy--44Fqp3ubNmL: Subscription completed. 
16:15:45.1350286 : (1) : AfterGrpBy--44Fqp3ubNmL: OnNext({ OrderId = 44Fqp3ubNmL, OrderType = TTT6, Quantity = 39, Price = 130.21875}) 

UPDATE2: Возможная ошибка или особенность

GroupBy выстреливает groupedObservable только если fireNewMapEntry истинно, (GroupBy.cs) и это происходит здесь

if (!_map.TryGetValue(key, out writer)) 
{ 
    writer = new Subject<TElement>(); 
    _map.Add(key, writer); 
    fireNewMapEntry = true; 
    } 

где _map имеет тип Dictionary<TKey, ISubject<TElement>>. Это может быть проблема?

ответ

0

Вам не задан характер GroupBy.

Оператор излучает OnNext только после появления новой группы (см. Реализацию GroupBy.cs:67). В вашем случае orderID равно для обоих уведомлений, поэтому излучается только один OnNext.

Значение, выданное оператором, составляет IGroupedObservable<T>, на которое вы можете подписаться, если вам нужен доступ к дальнейшим уведомлениям внутри группы.

+0

Как я уже говорил в вопросе это происходит редко, почти раз в 100 .. и если вы видите журналы после groupby, получается 2 разных наблюдаемых, которые должны приводить к 2 различным группам, которые могут быть связаны, и, следовательно, Я ожидаю 2 onNext .. Я обновлю вопрос с журналами для рабочих случаев. – MKMohanty

+0

Я предлагаю вам делать заметки с @LeeCampbell, чтобы улучшить стиль кодирования ваших запросов. Из вашего стиля подписки, вероятно, скрывается какое-то условие гонки, которое создает «случайность». Я считаю, что мой ответ верен, так как «GroupBy», конечно же, создает новый «IGroupedObsevable» для того же ключа после того, как подписка была удалена и продолжена снова. – supertopi

1

Просто некоторые заметки о вашем стиле кода (извините, это не совсем ответ, как я думаю, @supertopi ответил)

  1. переместит вас SubscribeOn и ObserveOn вызовы быть последним, что вы делаете, прежде чем ваша окончательная подписка. В вашем текущем коде вы выполняете Where, Snoop и GroupBy все на _uiScheduler, занимая драгоценные циклы.

  2. Избегайте подписки на подписку. Похоже, что AddIfNew берет ключ и IObservable<T>, поэтому я предполагаю, что он выполняет некоторую подписку внутри. Вместо этого опирайтесь на то, что вы знаете. Если вы используете GroupBy, то вы знаете, что ключ будет уникальным при первом приеме группы. Таким образом, теперь это может быть просто Add (если это ключ, который вы проверяете). Вы также можете использовать Take(1), если хотите быть явным. Если это значение не является ключом, который вы проверяете, то GroupBy представляется излишним.

  3. Try, чтобы держать вас имена переменных в соответствии с тем, другой разработчик читает через запрос они руководствуются красиво, вместо того, чтобы прыгать между subOs, childOs и childUpdates, когда childOrder, кажется, лучшее название (IMO)

  4. Идеально не возвращать нулевые значения в вашей наблюдаемой последовательности. Какой цели это служит? Это может иметь смысл в некоторых редких случаях, но часто я обнаруживаю, что вместо OnCompleted используется значение null, чтобы указать, что для этой последовательности нет значений.

например.

_transportService 
     .ObserveSubOrder(parentOrder.OrderId) 
     .Where(childOrder => childOrder != null)     
     .Snoop("BeforeGrpBy") 
     .GroupBy(childOrder => childOrder.OrderId) 
     .SelectMany(grp => grp.Take(1).Select(childOrder=>Tuple.Create(grp.key, childOrder)) 
     .SubscribeOn(_backgroundScheduler) 
     .ObserveOn(_uiScheduler) 
     .Subscribe(newGroup => 
     { 
      Add(newGroup.Item1, newGroup.Item2);       
     }, 
      ex=>//obviously we have error handling here ;-) 
     ); 

или

_transportService 
     .ObserveSubOrder(parentOrder.OrderId) 
     .Where(childOrder => childOrder != null)     
     .Snoop("BeforeGrpBy") 
     .SubscribeOn(_backgroundScheduler) 
     .ObserveOn(_uiScheduler) 
     .Subscribe(childOrder => 
      { 
      AddIfNew(childOrder.OrderId, childOrder);        
      }, 
      ex=>//obviously we have error handling here ;-) 
     ); 

и даже лучше (без подглядывать и нулевые чеки)

var subscription = _transportService 
     .ObserveSubOrder(parentOrder.OrderId) 
     .SubscribeOn(_backgroundScheduler) 
     .ObserveOn(_uiScheduler) 
     .Subscribe(
      childOrder => AddIfNew(childOrder.OrderId, childOrder), 
      ex=>//obviously we have error handling here ;-) 
     ); 

НТН

+1

Взятая ур точка 3. Для пункта 4. Я не могу контролировать генерацию последовательности в наблюдаемой, поскольку она исходит из другой сборки в рамках. Для пункта 2. Избегайте подписки на подписку! (Whats d недостаток здесь) Снова пункт 1, Согласился, что он может быть удален, но опять же он вызывает проблему параллелизма и, следовательно, отсутствует элементов? – MKMohanty

+0

И фактическая реализация не имеет Snoop, ее просто для отладки :), но имеют нулевую проверку по вышеуказанной причине. – MKMohanty

+0

Подписывание в рамках подписки означает, что вы, скорее всего, будете отбрасывать подписки на пол (и пошатывать не попробовать) , что означает утечку памяти. Вы также вряд ли будете иметь дополнительный уровень обработки ошибок. Вместо этого создайте потоки в запросе. –