2015-06-17 9 views
20

Моя проблема: я не могу получить бесконечный поток с Retrofit. После того, как я получу учетные данные для первоначального запроса poll(), я выполняю первоначальный запрос poll(). Каждый запрос poll() отвечает через 25 секунд, если изменений нет, или раньше, если есть какие-либо изменения - возврат измененных_данных []. Каждый ответ содержит timestamp данные, необходимые для следующего запроса опроса - я должен выполнить новый запрос poll() после каждого опроса(). Вот мой код:RxJava + Дооснащение длинным опросом

getServerApi().getLongPollServer() 
    .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
    .take(1) 
    .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
    .retry() 
    .subscribe(longPollEnvelope1 -> { 
    processUpdates(longPollEnvelope1.getUpdates()); 
}); 

Я новичок в RxJava, может быть, я чего-то не понимаю, но я не могу получить бесконечный поток. Я получаю 3 вызова, затем onNext и onComplete.

P.S. Может быть, есть лучшее решение для внедрения долгого опроса на Android?

+0

В вашем случае я бы рассмотрел возможность реализовать свой собственный «Наблюдаемый» с 'Observable.create()' –

ответ

11

Хотя я не идеален, я считаю, что вы можете использовать побочные эффекты RX для достижения желаемого результата (операции «doOn»).

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation 

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> { 
    // side effect variable 
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value) 
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid/onError, the later retry() will be called for new credentials 
      .flatMap(credentials -> api.query("request", credentials, timestamp.get())) // this will use the value from previous doOnNext 
      .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp())) 
      .repeat(); 
}) 
     .retry() 
     .share(); 

private static class CredentialsWithTimestamp { 

    public final String credentials; 
    public final long timestamp; // I assume this is necessary for you from the first request 

    public CredentialsWithTimestamp(String credentials, long timestamp) { 
     this.credentials = credentials; 
     this.timestamp = timestamp; 
    } 
} 

При подписке на «o» внутренний наблюдаемый будет повторяться. Если возникла ошибка, тогда «o» повторит запрос и повторно запросит его из потока учетных данных.

В вашем примере управление вычислением достигается путем обновления переменной временной метки, необходимой для следующего запроса.

+0

благодарю за ваш ответ. Тем не менее, я получаю временную метку от api, и я должен отправить ее обратно с новым вызовом poll(). – localhost

+0

Я обновил ответ, надеюсь, ближе к вашей ситуации. Вы можете видеть, что когда вы получаете серверResponse, вы просто устанавливаете переменную. «doOnNext» делает побочный эффект явным. Моя забота о том, что это некрасиво, и нам нужно будет увидеть, как ваш код будет лучше отвечать. – snodnipper

+0

У меня есть аналогичная проблема и решена с использованием вашего кода, но в моем случае я хочу сохранить значение слишком в первый раз. Где я могу поместить этот код? – Krutik

 Смежные вопросы

  • Нет связанных вопросов^_^