7

У меня были проблемы с рекурсивной цепочкой наблюдаемых.RxJS: Рекурсивный список наблюдаемых и одиночных наблюдателей

Я работаю с RxJS, который в настоящее время находится в версии 1.0.10621, и содержит основную функциональность Rx в сочетании с Rx для jQuery.

Позвольте мне представить пример сценария для моей проблемы: Я опроса Twitter search API (ответ JSON) для твитов/обновлений, содержащих определенное ключевое слово. Ответ также включает в себя «refresh_url», который следует использовать для генерации последующего запроса. Ответ на этот последующий запрос снова будет содержать новый refresh_url и т. Д.

Rx.jQuery позволяет мне заставить API поиска в Twitter вызывать наблюдаемое событие, которое создает один из них, а затем завершается. То, что я пробовал до сих пор, заключается в том, чтобы обработчик onNext помнил refresh_url и использовал его в обработчике onCompleted для создания как нового наблюдаемого, так и соответствующего наблюдателя для следующего запроса. Таким образом, одна наблюдаемая + наблюдательная пара следует за ней бесконечно.

Проблема с этим подходом является:

  1. Последующий наблюдаемыми/наблюдатель уже живы, когда их предшественники еще не были утилизированы.

  2. Мне нужно сделать много неприятной бухгалтерской отчетности, чтобы поддерживать действительную ссылку на живущего в настоящее время наблюдателя, которого на самом деле может быть два. (Один в onCompleted, а другой - в другом жизненном цикле). Эта ссылка, конечно, необходима для отказа от подписки/распоряжения наблюдателем. Альтернативой бухгалтерскому учету будет реализация побочного эффекта с помощью «все еще выполняющегося?» - логического, как я сделал в моем примере.

Пример кода:

  running = true; 
      twitterUrl = "http://search.twitter.com/search.json"; 
      twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); 
      twitterMaxId = 0; //actually twitter ignores its since_id parameter 

      newTweetObserver = function() { 
       return Rx.Observer.create(
         function (tweet) { 
          if (tweet.id > twitterMaxId) { 
           twitterMaxId = tweet.id; 
           displayTweet(tweet); 
          } 
         } 
        ); 
      } 

      createTwitterObserver = function() { 
       twitterObserver = Rx.Observer.create(
         function (response) { 
          if (response.textStatus == "success") { 
           var data = response.data; 
           if (data.error == undefined) { 
            twitterQuery = data.refresh_url; 
            var tweetObservable; 
            tweetObservable = Rx.Observable.fromArray(data.results.reverse()); 
            tweetObservable.subscribe(newTweetObserver()); 
           } 
          } 
         }, 
         function(error) { alert(error); }, 
         function() { 
          //create and listen to new observer that includes a delay 
          if (running) { 
           twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); 
           twitterObservable.subscribe(createTwitterObserver()); 
          } 
         } 
        ); 
       return twitterObserver; 
      } 
      twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); 
      twitterObservable.subscribe(createTwitterObserver()); 

Не обманывайте двойным слоем наблюдаемых/наблюдателей от запросов к твитов. Мой пример касается в основном первого уровня: запрос данных из Twitter. Если при решении этой проблемы второй слой (преобразование ответов в твиты) может стать одним с первым, это было бы фантастическим; Но я думаю, что это совсем другое дело. На данный момент.

Эрик Мейер указал мне оператор Expand (см. Пример ниже) и предложил Join patterns в качестве альтернативы.

var ys = Observable.Expand 
(new[]{0}.ToObservable() // initial sequence 
        , i => (i == 10 ? Observable.Empty<int>() // terminate 
     : new[]{i+1}.ToObservable() // recurse 
) 
); 

ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

Это должно быть скопировано в LINQPad. Он предполагает одноэлементные наблюдаемые и производит одного конечного наблюдателя.

Итак, мой вопрос: как я могу сделать трюк расширения в RxJS?

EDIT:
Оператор расширения, вероятно, может быть реализован, как показано на рисунке this thread. Но нужно было бы generators (и у меня только JS < 1.6).
К сожалению, RxJS 2.0.20304-beta не реализует метод расширения.

+2

Я пришел к выводу, что решение этой проблемы является действительно [Expand оператора] (http://social.msdn.microsoft.com/Forums/da-DK/rx/thread/2746e373- bf43-4381-834c-8cc182704ae9), который еще не реализован в RxJS до версии 2.0.20304-бета. – derabbink

+1

expand - это именно то, что вам нужно. Я понимаю, что этот пост старый, но можно просто перевести операторы, которые вам нужны, из будущей версии Rx в вашу собственную версию Rx. Это может потребовать некоторых манипуляций, но база кода существенно не изменилась с pre-v1. –

+0

Кроме того, обновление Rx (по возможности) является хорошим решением. В последних версиях исправлены и исправлены ошибки. –

ответ

4

Так что я собираюсь попытаться решить вашу проблему немного иначе, чем вы, и принять некоторые вольности, которые вы можете решить легче.

Так 1 вещь я не могу сказать, это то, что вы пытаетесь следующих шагов

  • Получить первый список твит, который содержит следующий URL
  • После чирикать список получен, onNext текущего наблюдателя и получить следующий набор твитов
    • Делайте это до бесконечности

Или есть действие пользователя (получить больше/прокрутка вниз). В любом случае, это действительно та же проблема. Возможно, я неправильно читаю вашу проблему. Вот ответ на этот вопрос.

function getMyTweets(headUrl) { 
    return Rx.Observable.create(function(observer) { 

     innerRequest(headUrl); 
     function innerRequest(url) { 
      var next = ''; 

      // Some magic get ajax function 
      Rx.get(url).subscribe(function(res) { 
       observer.onNext(res); 
       next = res.refresh_url; 
      }, 
      function() { 
       // Some sweet handling code 
       // Perhaps get head? 
      }, 
      function() { 
       innerRequest(next); 
      }); 
     } 
    }); 
} 

Возможно, это не тот ответ, о котором вы просили. Если нет, извините!

Редактировать: После просмотра кода, похоже, вы хотите принимать результаты как массив и обследовать его.

// From the results perform a select then a merge (if ordering does not matter). 
getMyTweets('url') 
    .selectMany(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }); 

// Ensures ordering 
getMyTweets('url') 
    .select(function(data) { 
     return Rx.Observable.fromArray(data.results.reverse()); 
    }) 
    .concat();