2016-11-28 4 views
0

Мой вопрос относится к оператору Rx и Catch. Допустим, у меня есть Timeout на моем наблюдаемом и каждый раз, когда происходит тайм-аут, я хотел бы воссоздать базовый наблюдаемый (Catch) и сделать то же самое (добавить тайм-аут и уловить).Бесконечный улов в реактивных выбросах

Ниже я вставил пример кода. Для целей этого примера время ожидания происходит каждые 2 секунды. Из моего наблюдения этот код не может работать бесконечно, так как после отдыха что-то держит ссылку на старые наблюдаемые остатки. Эти остатки накапливаются все время, когда вызывается Catch.

Самая подозрительная линия - последняя, ​​была какая-то собственная ссылка. Но я не могу себе представить, почему это может быть неправильно? Также есть ли способ создать наблюдаемую с помощью подобной логики, которая будет работать вечно?

public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return recreateObservable() 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable)); 
    } 
+1

Является ли это .net вопрос или вопрос Java? Я смущен тегом rx-java ... –

+0

Это больше Rx-вопрос, поэтому rx-net и rx-java в порядке. Только пример кода находится в C# –

ответ

1

Я думаю, вы просто хотите использовать оператор Retry().

Я предполагаю, что ваша начальная последовательность совпадает с вашей последовательностью продолжения.

например.

Observable.Return(1).Concat(Observable.Throw<int>(new Exception())) 
    .Retry() 

Это будет работать в плотном бесконечном цикле.

Ваш код может в конечном итоге, как

createObservable() 
    .Timeout(TimeSpan.FromSeconds(2)) 
    .Retry() 
0

Вы могли бы сделать что-то подобное.

//done in Linqpad, where async Main is allowed. 
async void Main() 
{ 
    var source = new Subject<string>(); 
    var backup = new Subject<string>(); 
    var reliableStream = source.CreateReliableStream(() => backup); 
    reliableStream.Subscribe(s => Console.WriteLine($"Next: {s}"), e => Console.WriteLine($"Error: {e.Message}"),() => Console.WriteLine("Completed.")); 

    source.OnNext("sourceAbc"); 
    backup.OnNext("backupAbc"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    source.OnNext("sourceDef"); 
    backup.OnNext("backupDef"); 
    await Task.Delay(TimeSpan.FromSeconds(2.5)); 

    //Doesn't yield "Completed" because it's re-subscribing. 
    source.OnCompleted(); 
    backup.OnCompleted(); 

} 

public static class Ex 
{ 
    public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable) 
    { 
     return targetObservable 
      .Timeout(TimeSpan.FromSeconds(2)) 
      .Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable)); 
    } 

    public static IEnumerable<IObservable<T>> InfiniteObservables<T>(Func<IObservable<T>> f) 
    { 
     while(true) 
      yield return f(); 
    } 

    private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable) 
    { 
     GC.Collect(); // For debug - make sure all unreferenced object are removed 

     return InfiniteObservables(recreateObservable) 
      .Select(o => o.Timeout(TimeSpan.FromSeconds(2))) 
      .OnErrorResumeNext(); 
    } 
} 

Вырабатывает следующий вывод:

Next: sourceAbc 
Next: sourceDef 
Next: backupHij 
Next: backupLmn 

Я не поклонник этого подхода, хотя. Rx обрабатывает ошибки, такие как потоковые терминаторы, и вы пытаетесь рассматривать их как альтернативное сообщение. Вы в конечном итоге плаваете вверх-рекой, как это.

 Смежные вопросы

  • Нет связанных вопросов^_^