2014-10-21 3 views
2

В RxJava для Android я хочу испускать элементы в интервале, который зависит от самого элемента: в Observable я вытягиваю элемент из очереди, обрабатываю это и испустить. В зависимости от типа элемента, я хочу настроить после того, как выйдет следующий элемент (замедлить или ускорить интервал).Испускать элементы в RxJava в интервале, который зависит от самого испущенного элемента

Следующий код, предложенный @ a.bertucci здесь Emit objects for drawing in the UI in a regular interval using RxJava on Android демонстрирует, как испускать элементы в регулярном интервале.

private void drawPath(final String chars) { 
    Observable.zip(
     Observable.create(new Observable.OnSubscribe<Path>() { 
      // all the drawing stuff here 
      ... 
     }), 
     Observable.timer(0, 50, TimeUnit.MILLISECONDS), 
     new Func2<Path, Long, Path>() { 
      @Override 
      public Path call(Path path, Long aLong) { 
       return path; 
      } 
     } 
    ) 
    .subscribeOn(Schedulers.newThread()) 
    .observeOn(AndroidSchedulers.mainThread()) 
    ... 
} 

Мой вопрос теперь, если это вообще возможно, чтобы изменить частоту излучения, а наблюдаемая излучающий, а что предпочтительная реализация с использованием RxJava.

ответ

0

Вы можете использовать

public final <U, V> Observable<T> delay(
      Func0<? extends Observable<U>> subscriptionDelay, 
      Func1<? super T, ? extends Observable<V>> itemDelay) 

или

public final <U> Observable<T> delay(Func1<? super T, ? extends Observable<U>> itemDelay) 

Вы можете использовать itemDelay для управления скоростью.

+0

Разве это не перекладывать всю последовательность, определенная задержка? Это не то, что мне нужно. Я не хочу менять последовательность. Я хочу, чтобы первый элемент был немедленно удален. Затем (или раньше), испускающий этот элемент, я хочу посмотреть на элемент, который я испускаю, и решить, когда будут выбрасываться следующие элементы (в зависимости от элемента). Затем выдается следующий элемент, и время снова изменяется для элемента после. –

0

Согласно вашему комментарию, я думаю, вы должны создать нового оператора для этого.

Этот оператор будет принимать функцию, вычислить задержку, чтобы применить испускать вашему следующему пункту

Observable.range(1, 1000).lift(new ConfigurableDelay((item) -> 3 SECONDS) 
          .subscribe(); 

Вы можете попробовать что-то вроде этого:

public class ConfDelay { 

public static void main(String[] args) { 
    Observable.range(1, 1000).lift(new ConfigurableDelay(ConfDelay::delayPerItem, Schedulers.immediate())) 
      .map((i) -> "|") 
      .subscribe(System.out::print); 
} 


public static TimeConf delayPerItem(Object item) { 
    long value = ((Integer) item).longValue(); 
    return new TimeConf(value * value, TimeUnit.MILLISECONDS); 
} 

private static class TimeConf { 
    private final long time; 
    private final TimeUnit unit; 

    private TimeConf(final long time, final TimeUnit unit) { 
     this.time = time; 
     this.unit = unit; 
    } 
} 

private static class ConfigurableDelay<T> implements Observable.Operator<T, T> { 
    private final Func1<T, TimeConf> itemToTime; 
    private final Scheduler scheduler; 

    public ConfigurableDelay(final Func1<T, TimeConf> itemToTime) { 
     this(itemToTime, Schedulers.computation()); 
    } 

    public ConfigurableDelay(final Func1<T, TimeConf> itemToTime, final Scheduler scheulder) { 
     this.itemToTime = itemToTime; 
     this.scheduler = scheulder; 
    } 

    @Override 
    public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { 
     return new Subscriber<T>(subscriber) { 

      private TimeConf nextTime = null; 

      @Override 
      public void onCompleted() { 
       subscriber.onCompleted(); 
      } 

      @Override 
      public void onError(final Throwable e) { 
       subscriber.onError(e); 
      } 

      @Override 
      public void onNext(final T t) { 
       TimeConf previousNextTime = nextTime; 
       this.nextTime = itemToTime.call(t); 
       if (previousNextTime == null) { 
        subscriber.onNext(t); 
       } else { 
        scheduler.createWorker().schedule(() -> subscriber.onNext(t), previousNextTime.time, previousNextTime.unit); 
       } 
      } 
     }; 
    } 
} 
} 
+1

Я думаю, что мой код можно заменить на zip/flatmap следующим образом: obs.map (e -> новая пара (e, 3 SECONDS)) .flatMap ((conf) -> Observable.just (conf. событие) .delay (conf.time)) .subscribe(); – dwursteisen