2016-06-21 16 views
1

У меня была ошибка в моем производственном коде, который я отслеживал и сумел создать тестовый пример для воспроизведения. Я создаюКак остановить утечку IDisposables с помощью IObservable <IDisposable>?

IObservable<IDisposable> 

экземпляры и использование серийного одноразового использования в подписке для хранения не более одного элемента в живых. Это простой способ добавить графические объекты в сцену и удалить их, когда доступны обновления.

Однако следующий тестовый пример показывает тонкую ошибку.

using System; 
using System.Reactive.Concurrency; 
using System.Reactive.Disposables; 
using System.Reactive.Linq; 
using FluentAssertions; 
using Microsoft.Reactive.Testing; 
using Xunit; 

namespace WeinCadSW.Spec 
{ 
    /// <summary> 
    /// This test case demonstrates problems with streams of IDisposables. 
    /// http://stackoverflow.com/questions/37936537/how-to-stop-leaking-idisposables-with-an-iobservableidisposable 
    /// </summary> 
    public class ObservableDisposableSpec : ReactiveTest 
    { 
     TestScheduler _Scheduler = new TestScheduler(); 
     [Fact] 
     public void ShouldWork() 
     { 
      var o = _Scheduler.CreateHotObservable 
       (OnNext(100, "A") 
       , OnNext(200, "B") 
       , OnNext(250, "C") 
       , OnNext(255, "D") 
       , OnNext(258, "E") 
       , OnNext(600, "F") 
       ); 

      var disposablesCreated = 0; 
      var disposabledDisposed = 0; 
      var oo = o.Select 
       (s => 
       { 
        disposablesCreated++; 
        return Disposable.Create(() => disposabledDisposed++); 
       }) 
       .Delay(TimeSpan.FromTicks(10), _Scheduler); 


      IDisposable sub = Disposable.Empty; 
      _Scheduler.ScheduleAbsolute(null, 0, (Func<IScheduler, object, IDisposable>)((scheduler, state) => 
      { 
       sub = oo.SubscribeDisposable(); 
       return Disposable.Empty; 
      })); 
      _Scheduler.ScheduleAbsolute(null, 605, (Func<IScheduler, object, IDisposable>)((scheduler, state) => 
      { 
       sub.Dispose(); 
       return Disposable.Empty; 
      })); 

      _Scheduler.Start(); 

      // This test will fail here because 6 disposables are created. 
      disposablesCreated.Should().Be(6); 
      disposabledDisposed.Should().Be(6); // but is actually 5 

     } 

    } 

и метод SubscribeDisposable, который находится в центре этой проблемы.

public static class Extensions 
    { 

     public static IDisposable SubscribeDisposable (this IObservable<IDisposable> o) 
     { 
      var d = new SerialDisposable(); 

      var s = o.Subscribe(v => 
      { 
       d.Disposable = v; 
      }); 

      return new CompositeDisposable(s, d); 

     } 


    } 

} 

Когда я распоряжаюсь подпиской, генерируется еще один IDisposable и никогда не отправляется на подписку.

6 одноразовых предметов генерируются, но один просочился. Это из-за задержки, которую я ввел в систему для моделирования задержек планирования в реальной системе.

Так что мой вопрос.

Возможно ли написать ПодписатьсяDisposable, аналогично вышесказанному, что не будет утечка IDisposables.

+0

Я не понимаю реактивного, но где используется 'SubscribeDisposable'? – Euphoric

+0

Я обновил тестовый пример, чтобы полностью продемонстрировать SubscribeDisposable. – bradgonesurfing

+0

Разве это не потому, что последнее событие находится в момент 600, но субпозиция расположена на отметке 605, что означает, что она никогда не будет удалена? – Euphoric

ответ

0

Я твердо убежден, что решения вышеупомянутого вопроса не существует из-за контрактов, которые обеспечивает RX. Поэтому я обновил свой код, чтобы избежать описанного выше шаблона. Вместо этого я использую.

/// <summary> 
    /// Subscribes to the observable sequence and manages the disposables 
    /// with a serial disposable. That 
    /// is before the function is called again the previous disposable is disposed. 
    /// </summary> 

    public static IDisposable SubscribeDisposable<T> 
    (this IObservable<T> o, Func<T, IDisposable> fn, Action<Exception> errHandler) 
    { 
     var d = new SerialDisposable(); 

     var s = o.Subscribe(v => 
     { 
       d.Disposable = Disposable.Empty; 
       d.Disposable = fn(v) ?? Disposable.Empty; 
     }, onError:errHandler); 

     return new CompositeDisposable(s,d); 

    } 

Таким образом

IObservable<IDisposable> 

удаляется из моего кода, как это кажется опасным.