Мой вопрос относится к оператору Rx и Catch. Допустим, у меня есть Timeout на моем наблюдаемом и каждый раз, когда происходит тайм-аут, я хотел бы воссоздать базовый наблюдаемый (Catch) и сделать то же самое (добавить тайм-аут и уловить).Бесконечный улов в реактивных выбросах
Ниже я вставил пример кода. Для целей этого примера время ожидания происходит каждые 2 секунды. Из моего наблюдения этот код не может работать бесконечно, так как после отдыха что-то держит ссылку на старые наблюдаемые остатки. Эти остатки накапливаются все время, когда вызывается Catch.
Самая подозрительная линия - последняя, была какая-то собственная ссылка. Но я не могу себе представить, почему это может быть неправильно? Также есть ли способ создать наблюдаемую с помощью подобной логики, которая будет работать вечно?
public static IObservable<string> CreateReliableStream(this IObservable<string> targetObservable, Func<IObservable<string>> recreateObservable)
{
return targetObservable
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(exception => ReconnectOnError(exception, recreateObservable));
}
private static IObservable<string> ReconnectOnError(Exception exception, Func<IObservable<string>> recreateObservable)
{
GC.Collect(); // For debug - make sure all unreferenced object are removed
return recreateObservable()
.Timeout(TimeSpan.FromSeconds(2))
.Catch<string, Exception>(ex => ReconnectOnError(ex, recreateObservable));
}
Является ли это .net вопрос или вопрос Java? Я смущен тегом rx-java ... –
Это больше Rx-вопрос, поэтому rx-net и rx-java в порядке. Только пример кода находится в C# –