У меня были проблемы с рекурсивной цепочкой наблюдаемых.RxJS: Рекурсивный список наблюдаемых и одиночных наблюдателей
Я работаю с RxJS, который в настоящее время находится в версии 1.0.10621, и содержит основную функциональность Rx в сочетании с Rx для jQuery.
Позвольте мне представить пример сценария для моей проблемы: Я опроса Twitter search API (ответ JSON) для твитов/обновлений, содержащих определенное ключевое слово. Ответ также включает в себя «refresh_url», который следует использовать для генерации последующего запроса. Ответ на этот последующий запрос снова будет содержать новый refresh_url и т. Д.
Rx.jQuery позволяет мне заставить API поиска в Twitter вызывать наблюдаемое событие, которое создает один из них, а затем завершается. То, что я пробовал до сих пор, заключается в том, чтобы обработчик onNext помнил refresh_url и использовал его в обработчике onCompleted для создания как нового наблюдаемого, так и соответствующего наблюдателя для следующего запроса. Таким образом, одна наблюдаемая + наблюдательная пара следует за ней бесконечно.
Проблема с этим подходом является:
Последующий наблюдаемыми/наблюдатель уже живы, когда их предшественники еще не были утилизированы.
Мне нужно сделать много неприятной бухгалтерской отчетности, чтобы поддерживать действительную ссылку на живущего в настоящее время наблюдателя, которого на самом деле может быть два. (Один в onCompleted, а другой - в другом жизненном цикле). Эта ссылка, конечно, необходима для отказа от подписки/распоряжения наблюдателем. Альтернативой бухгалтерскому учету будет реализация побочного эффекта с помощью «все еще выполняющегося?» - логического, как я сделал в моем примере.
Пример кода:
running = true;
twitterUrl = "http://search.twitter.com/search.json";
twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
twitterMaxId = 0; //actually twitter ignores its since_id parameter
newTweetObserver = function() {
return Rx.Observer.create(
function (tweet) {
if (tweet.id > twitterMaxId) {
twitterMaxId = tweet.id;
displayTweet(tweet);
}
}
);
}
createTwitterObserver = function() {
twitterObserver = Rx.Observer.create(
function (response) {
if (response.textStatus == "success") {
var data = response.data;
if (data.error == undefined) {
twitterQuery = data.refresh_url;
var tweetObservable;
tweetObservable = Rx.Observable.fromArray(data.results.reverse());
tweetObservable.subscribe(newTweetObserver());
}
}
},
function(error) { alert(error); },
function() {
//create and listen to new observer that includes a delay
if (running) {
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
twitterObservable.subscribe(createTwitterObserver());
}
}
);
return twitterObserver;
}
twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
twitterObservable.subscribe(createTwitterObserver());
Не обманывайте двойным слоем наблюдаемых/наблюдателей от запросов к твитов. Мой пример касается в основном первого уровня: запрос данных из Twitter. Если при решении этой проблемы второй слой (преобразование ответов в твиты) может стать одним с первым, это было бы фантастическим; Но я думаю, что это совсем другое дело. На данный момент.
Эрик Мейер указал мне оператор Expand (см. Пример ниже) и предложил Join patterns в качестве альтернативы.
var ys = Observable.Expand
(new[]{0}.ToObservable() // initial sequence
, i => (i == 10 ? Observable.Empty<int>() // terminate
: new[]{i+1}.ToObservable() // recurse
)
);
ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
Это должно быть скопировано в LINQPad. Он предполагает одноэлементные наблюдаемые и производит одного конечного наблюдателя.
Итак, мой вопрос: как я могу сделать трюк расширения в RxJS?
EDIT:
Оператор расширения, вероятно, может быть реализован, как показано на рисунке this thread. Но нужно было бы generators (и у меня только JS < 1.6).
К сожалению, RxJS 2.0.20304-beta не реализует метод расширения.
Я пришел к выводу, что решение этой проблемы является действительно [Expand оператора] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9), который еще не реализован в RxJS до версии 2.0.20304-бета. – derabbink
expand - это именно то, что вам нужно. Я понимаю, что этот пост старый, но можно просто перевести операторы, которые вам нужны, из будущей версии Rx в вашу собственную версию Rx. Это может потребовать некоторых манипуляций, но база кода существенно не изменилась с pre-v1. –
Кроме того, обновление Rx (по возможности) является хорошим решением. В последних версиях исправлены и исправлены ошибки. –