2016-11-14 5 views
1

Я делаю асинхронных вызовов, после 10 секунд в течение 1 минуты, что означает примерно 6 звонки будут сделаны, но проблема в я хочу применить delay на конкретных conditionDelay кроме Первый раз Rxjava Android

Observable 
.just(listOfSomethings_Locally) 
.take(1, TimeUnit.MINUTES) 
.serialize() 
.delaySubscription(// this is confusing part 
() -> 
    Observable.just(listOfItems_Network).take(10,TimeUnit.SECONDS) 
) 

Я хочу, чтобы задержка сетевого вызова в течение 10 секунд, за исключением первого вызова, и отмена сетевого вызова через 10 секунд, поэтому я должен иметь точные 6 вызовов за 1 минуту.

EDIT

Из-за путаницы в сценарии здесь переопределяется сценарий:

, что у меня есть большой список драйверов на местном уровне, и я хочу, чтобы отправить запрос к каждому из них после каждого 10 секунд и прослушать другого абонента , чтобы проверить, не отменил ли его водитель в течение 10 секунд, этот процесс будет продолжаться около 1 минуты, если один из водителей отменит i немедленно отправьте запрос на следующий

Код, написанные до сих пор:

Observable.from(driversGot) 
       .take(1,TimeUnit.MINUTES) 
       .serialize() 
       .map(this::requestRydeObservable) // requesting for single driver from driversGot (it's a network call) 
       .flatMap(dif -> 
         Observable.amb(
           kh.getFCM().driverCanceledRyde(), // listen for if driver cancel request returns integer 
           kh.getFCM().userRydeAccepted()) // listen for driver accept returns RydeAccepted object 
           .map(o -> { 
            if (o instanceof Integer) { 
             return new RydeAccepted(); 
            } else if (o instanceof RydeAccepted) { 
             return (RydeAccepted) o; 
            } 
            return null; 
           }).delaySubscription(10,TimeUnit.SECONDS) 
       ) 
       .subscribeOn(Schedulers.io()) 
       .observeOn(AndroidSchedulers.mainThread()) 
       .subscribe(fua -> { 
        if (fua == null) { 
         UiHelpers.showToast(context, "Invalid Firebase response"); 
        } else if (!fua.getStatus()) { // ryde is canceled because object is empty 
         UiHelpers.showToast(context, "User canceled ryde"); 
        } else { // ryde is accepted 
         UiHelpers.showToast(context, "User accepted ryde"); 
        } 
       }, t -> { 
        t.printStackTrace(); 
        UiHelpers.showToast(context,"Error sending driver requests"); 
       }, UiHelpers::stopLoading); 
+0

Просьба предоставить более подробную информацию о ваших наблюдаемых и вашем прецеденте. Насколько я понимаю ваше обновление: у вас есть список предметов, которые будут преобразованы в Observable. Вы хотите обработать один элемент за раз, который был нажат на вас. Значения должны быть запланированы на вас каждые 10 секунд. Пример: Sec 0: Значение 1, Sec 10: Значение 2. Каждое испускаемое значение будет обрабатываться через веб-вызов. Вы проверяете другой предмет, если предмет был отменен в течение 10 секунд. Если нет, то для одного элемента будет 60 секунд. Если отменено в течение 10 секунд, вы начинаете следующий в очереди? –

+0

Вы точно понимаете поток, за исключением того, что 60 секунд - это общее предельное время для всех элементов в списке –

+0

Итак, ваш список состоит из 6 элементов, или как бы вы хотели закончить через 60 секунд, если вы schnedule один элемент каждые 10 секунд? –

ответ

0

Привет вы можете использовать повторить попытку с задержкой есть способ, как вы можете сделать это here

+0

здесь немного сложнее, ссылка дает только объяснение поведения 'delay' –

2

Отзывов о коде

Вы дон Не нужно take и serialize, так как just испускает материал сразу + уже серийно.

delaySubscription кажется странным выбором, поскольку после того, как прошел наблюдаемым генерирует событие, дальнейшие события не задерживаются (что противоречит вашему делу) delaySubscription

Option # 1, RX только

Использование delay + просчитывать отдельные задержки для остальных событий (так первый задерживается на 0 секунд, вторая задержка на 1 секунду, 3 для 3, ...)

  AtomicLong counter = new AtomicLong(0); 
    System.out.println(new Date()); 
    Observable.just("1", "2", "3", "4", "5", "6") 
     .delay(item -> Observable.just(item).delay(counter.getAndIncrement(), TimeUnit.SECONDS)) 
     .subscribe(new Consumer<String>() { 
      public void accept(String result) throws Exception { 
       System.out.println(result + " " + new Date()); 
      } 
     });   
     System.in.read(); 

Вариант № 2: ограничение скорости

Кажется, что ваш UseCase подходит для оценки ограничения, поэтому мы можем использовать RateLimiter из гуавы:

  RateLimiter limiter = RateLimiter.create(1); 
    System.out.println(new Date()); 
    Observable.just("1", "2", "3", "4", "5", "6") 
     .map(r -> { 
      limiter.acquire(); 
      return r; 
     }) 
     .subscribe(new Consumer<String>() { 
      public void accept(String result) throws Exception { 
       System.out.println(result + " " + new Date()); 
      } 
     });   
     System.in.read(); 

Оба работают аналогично:

Tue Nov 15 11:14:34 EET 2016 
1 Tue Nov 15 11:14:34 EET 2016 
2 Tue Nov 15 11:14:35 EET 2016 
3 Tue Nov 15 11:14:36 EET 2016 
4 Tue Nov 15 11:14:37 EET 2016 
5 Tue Nov 15 11:14:38 EET 2016 
6 Tue Nov 15 11:14:39 EET 2016 

Оценить ограничитель будет работать лучше в случае вашего запроса, например передавая в течение 5 секунд, то это позволит ускорить выполнение следующих запросов, чтобы компенсировать задержку и достичь цели 1рек/с в течение 10 секунд.

+0

в вашем варианте # 1, как ограничить запрос в течение 1 минуты, так как каждый запрос выполняется после' (номер заказа) -1' секунд, но это не соответствует моему вопросу, у меня есть большой список драйверов локально, и я хочу отправить запрос каждому из них через каждые 10 секунд и прослушать другого абонента, чтобы проверить, не отменил ли драйвер его в течение 10 секунд, это процесс будет продолжаться около 1 минуты, если один из драйверов отменит, я должен немедленно отправить запрос на следующий –

+0

. Я только что обновил свой вопрос, объяснив scnerio –

0

Я хотел бы обновить сообщение @Ivans, потому что он пропускает обработку ошибок и использует побочные эффекты.

Этот пост будет использовать только операторы RxJava. Наблюдаемый будет давать значение каждые 10 секунд. Запрос истечет через 10 секунд.Если он достигнет таймаута, возвращается возвращаемое значение.

Первый тестовый метод получит 10 значений за 60 секунд. Наблюдаемый может закончиться до 60 секунд, если последний запрос заканчивается раньше 10 секунд.

public class TimeOutTest { 
    private static String DUMMY_VALUE = "ERROR"; 

    @Test 
    public void handles_in_60_seconds() throws Exception { 
     List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 

     Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6); 

     Observable<Integer> vales = Observable.fromIterable(actions) 
       .take(6); 

     Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> { 
      return result; 
     }); 

     Observable<String> stringObservable = observable.flatMap(integer -> { 
      return longNetworkLong(9_000) 
        .timeout(10, TimeUnit.SECONDS) 
        .onErrorReturnItem(DUMMY_VALUE); 
     }).doOnNext(s -> System.out.println("VALUE")); 

     stringObservable.test() 
       .awaitDone(60, TimeUnit.SECONDS) 
       .assertValueCount(6); 
    } 

    @Test 
    public void last_two_values_timeOut() throws Exception { 
     List<Integer> actions = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); 

     Observable<Long> timer = Observable.interval(0, 10, TimeUnit.SECONDS).take(6); 

     Observable<Integer> vales = Observable.fromIterable(actions) 
       .take(6); 

     Observable<Integer> observable = Observable.zip(timer, vales, (time, result) -> { 
      return result; 
     }); 

     Observable<String> stringObservable = observable 
       .map(integer -> integer * 2500) 
       .flatMap(integer -> { 
        return longNetworkLong(integer) 
          .timeout(10, TimeUnit.SECONDS) 
          .doOnError(throwable -> System.out.print("Timeout hit?")) 
          .onErrorReturnItem(DUMMY_VALUE); 
       }) 
       .doOnNext(s -> System.out.println("VALUE")) 
       .filter(s -> !Objects.equals(s, DUMMY_VALUE)); 

     stringObservable.test() 
       .awaitDone(60, TimeUnit.SECONDS) 
       .assertValueCount(4); 

    } 

    private Observable<String> longNetworkLong(int delayTime) { 
     return Observable.fromCallable(() -> { 
      Thread.sleep(delayTime); 
      return "result"; 
     }); 
    } 
} 
+0

, что именно делает этот код? –

+1

Он принимает список и преобразует его в наблюдаемый. Существует еще одно Наблюдаемое, которое испускает значение каждые 10 секунд. Я беру 6 значений из обоих и Zip их вместе. Таким образом, я получаю значение, нажимаемое на меня каждые 10 секунд, начиная со второго 0. Для каждого значения я вызываю длинный метод: longNetworkLong. Если результат longNetworkLong Observable занимает больше 10 секунд, я отменяю запрос и предоставляю резервное значение. Я посмотрю ваше новое требование. –