2017-02-05 16 views
3

Usecase: Я разрабатываю приложение для Android, у которого есть просмотрщик с 4 вкладками, все из которых являются фрагментами. Для каждой вкладки/фрагмента мне нужно подключиться к REST Api с выводом Oauth и токена каждые 5 минут.RxJava - Подождите, пока retryWhen закончить другие наблюдаемые в других мероприятиях/фрагментах

Последнее решение: Использование RxJava и retryWhen оператор я могу повторно аутентифицировать при получении ошибки 401 HTTP. Для каждого наблюдаемого потока подписных и потребленные и использовать:

retryWhen(refreshTokenAuthenticator) 

Итак, когда маркер истекает поток потребить его, а затем выполнить реальный вызов API.

Проблема: это работает только для одной наблюдаемых потребленных в одном подписались, но мне нужно, чтобы позволить пользователю переключаться между вкладками, не блокируя его/ее, принимая во внимание, что 401 ошибки может появиться в любое время в любом фрагменте в любой Api Call.

Вопрос: Есть ли способ, чтобы сделать наблюдаемые ждать других наблюдаемые закончить с onNext(), которые не находятся в том же потоке/абоненте? На самом деле в разных Фрагментах? Таким образом, сценарии апи вызов будет выглядеть следующим образом:

Api Call Fragment A --> request 
Api Call Fragment A <-- response 200 Code 

Api Call Fragment B --> request 
Api Call Fragment B <-- response 401 Code (retryWhen in action) 
Api Call Fragment B --> request (refreshToken) 
Api Call Fragment B <-- response 200 (with new access token saved in the app) 

Почти в то же время ...

Api Call Fragment C --> request 
Api Call Fragment C <-- response 401 Code (retryWhen in action) 

Observable in Fragment C Waits till Observable in Fragment B finish (onNext()) 

Api Call Fragment C --> request 
Api Call Fragment C <-- response 200 

Это то, что у меня уже есть , каждый вызов API выглядит почти так же:

public void getDashboardDetail() { 

    Subscription subscription = repository.getDashboard() 
      .subscribeOn(Schedulers.io()) 
      .observeOn(AndroidSchedulers.mainThread()) 
      .retryWhen(tokenAuthenticator) 
      .subscribe(new RestHttpObserver<UserDataDto>() { 
       @Override 
       public void onUnknownError(Throwable e) { 
        getMvpView().onError(e); 
       } 

       @Override 
       public void onHostUnreachable() { 
        getMvpView().onHostUnreachable(); 
       } 

       @Override 
       public void onHttpErrorCode(int errorCode, ErrorDto errorDto) { 
        getMvpView().onHttpErrorCode(errorCode, errorDto); 
       } 

       @Override 
       public void onCompleted() { 
        //Do nothing... 
       } 

       @Override 
       public void onNext(UserDataDto response) { 
        getMvpView().onReceiveUserData(response); 
       } 
      }); 

    this.compositeSubscription.add(subscription); 

} 

И мой RefreshTokenAuthenticator:

public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> { 

private static final int RETRY_COUNT = 1; 

private static final int HTTP_ERROR_CODE = 401; 

@Inject 
private UserRepository repository; 

@Inject 
private SessionManager sessionManager; 

@Inject 
private MyApplication application; 


@Inject 
private RefreshTokenAuthenticator() { 
} 

@Override 
public synchronized Observable<?> call(Observable<? extends Throwable> observable) { 
    return observable 
      .flatMap(new Func1<Throwable, Observable<?>>() { 
       int retryCount = 0; 

       @Override 
       public Observable<?> call(final Throwable throwable) { 

        retryCount++; 
        if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) { 
         int errorCode = ((HttpException) throwable).code(); 
         if (errorCode == HTTP_ERROR_CODE) { 
          return repository 
            .refreshToken(sessionManager.getAuthToken().getRefreshToken()) 
            .observeOn(AndroidSchedulers.mainThread()) 
            .subscribeOn(Schedulers.io()) 

            .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto)) 
            .doOnError(throwable1 -> { 
             Log.e("RefreshTokenAuth", "DoOnError", throwable1); 
             application.logout(); 
            }); 

         } 
        } 
        // No more retries. Pass the original Retrofit error through. 
        return Observable.error(throwable); 
       } 
      }); 
} 

}

ответ

2

1) Сделать источник AUTH лексемы кэш последнего успешного результата + обеспечивают способ аннулирует кэшированный результат:

class Auth { 
    private Observable<AuthToken> validToken; 

    synchronized void invalidateAuthToken() { 
     validToken = null; 
    } 

    synchronized Observable<AuthToken> getAuthToken() { 
     if (validToken == null) { 
      validToken = repository 
       .refreshToken(...) // start async request 
       .doOnError(e -> invalidateAuthToken()) 
       .replay(1); // cache result 
     } 
     return validToken; // share among all subscribers 
    } 
} 

2) Для доступа к веб-сервису используйте следующий шаблон:

Observable<Data1> dataSource1 = 
    Observable.defer(auth.getAuthToken()) // always start from token 
     .flatMap(token -> 
      repository.fetchData1(token, ...)) // use token to call web service 
     .doOnError(e -> auth.invalidateAuthToken()) 
     .retry(N); // retry N times 
+0

Я использую и перехватчик с Модернизированный 2, чтобы перехватить запрос и добавить заголовок с маркером. Считаете ли вы, что ваш ответ (с использованием authtoken по Rx) лучше или, по крайней мере, более прост в обращении? –

+0

@NicolasJafelle Мой пример является общим.Вы можете реализовать функцию 'fetchData1', чтобы он вводил токен способом« Дооснащение ». –

0

Наконец, заставьте его работать, просто добавив глобальный (в моем классе Application) логический, если приложение в настоящее время повторно аутентифицируется или нет. Фактически он разрешает две ошибки 401 HTTP, а второй - в onNext() и выполняет начальное наблюдение. Я хотел бы сделать что-то более реактивное, но, по крайней мере, это решает мою главную проблему.

public class RefreshTokenAuthenticator implements Func1<Observable<? extends Throwable>, Observable<?>> { 

private static final int RETRY_COUNT = 1; 

private static final int HTTP_ERROR_CODE = 401; 

@Inject 
private UserRepository repository; 

@Inject 
private SessionManager sessionManager; 

@Inject 
private MyApplication application; 


@Inject 
private RefreshTokenAuthenticator() { 
} 

@Override 
public Observable<?> call(Observable<? extends Throwable> observable) { 
    return observable 
      .flatMap(new Func1<Throwable, Observable<?>>() { 
       int retryCount = 0; 

       @Override 
       public Observable<?> call(final Throwable throwable) { 

        retryCount++; 
        if (retryCount <= RETRY_COUNT && throwable instanceof HttpException) { 
         int errorCode = ((HttpException) throwable).code(); 

         if (errorCode == HTTP_ERROR_CODE) { 

          Log.i("RefreshTokenAuth", "APPLICATION IS AUTHENTICATING = " + application.isAuthenticating); 
          if (!application.isAuthenticating) { 
           application.isAuthenticating = true; 

           String refreshToken = sessionManager.getAuthToken().getRefreshToken(); 

           return repository 
             .refreshToken(refreshToken) 
             .observeOn(AndroidSchedulers.mainThread()) 
             .subscribeOn(Schedulers.io()) 
             .doOnCompleted(() -> application.isAuthenticating = false) 
             .doOnNext(tokenDto -> sessionManager.saveAuthToken(tokenDto)) 
             .doOnError(throwable1 -> { 
              Log.e("RefreshTokenAuth", "DoOnError", throwable1); 
              application.logout(); 
             }); 
          } else { 
           return Observable.just(1).doOnNext(o -> Log.i("RefreshTokenAuth", "Let's try another shot!")); 
          } 
         } 
        } 
        // No more retries. Pass the original Retrofit error through. 
        return Observable.error(throwable); 
       } 
      }); 
} 

}