2017-02-04 14 views
2

Мне нужно создать Observable, который будет собирать информацию из разных источников (другие наблюдаемые данные), и каждый источник влияет на значение события, но все же значение построено на основе предыдущего value (вид конечного автомата).Слияние Наблюдаемые с обратной связью, слияния Наблюдаемые с самим собой

У нас есть сообщение с INT значение и код операции:

class Message{ 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 
} 

И какой-то источник таких значений со значением инициализации:

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")); 

В настоящее время существуют источники события. Эти источники будут испускать значения, на основе которых и на основе предыдущего значения dynamicMessage появится новое значение dynamicMessage. На самом деле его типы событий:

class Changer1 { 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 
} 


class Changer2 { 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 
} 

Changer1 отвечает за изменение операции. Changer2 отвечает за изменение стоимости.

и источники этих значений:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+")) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Changer2> changers2 = Observable.just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2)) 
       .delay(2, TimeUnit.SECONDS)); 

Теперь мне нужно изменить dynamicMessage Observable и принимать в отношении сообщений от смены. вот схема: state machine

Также я пытаюсь написал программу, как я вижу решение, но он висит с OutOfMemoryException, Rigth после первого значения:
Сообщения {значения = 1, операция = «+»}
Листинг:

import rx.Observable; 
import rx.functions.Action1; 
import rx.functions.Func1; 

public class RxDilemma { 


static class Message{ 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + 
       "value=" + value + 
       ", operation='" + operation + '\'' + 
       '}'; 
    } 
} 

static class Changer1 { 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Changer1{" + 
       "operation='" + operation + '\'' + 
       '}'; 
    } 
} 


static class Changer2 { 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "Changer2{" + 
       "value=" + value + 
       '}'; 
    } 
} 





static Observable<Changer1> changers1 = Observable.just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+")) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Changer2> changers2 = Observable.just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2)) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() { 
    @Override 
    public Observable<Message> call(final Changer1 changer) { 
     return dynamicMessage.last().map(new Func1<Message, Message>() { 
      @Override 
      public Message call(Message message) { 
       message.operation = changer.operation; 
       return message; 
      } 
     }); 
    } 
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() { 
    @Override 
    public Observable<Message> call(final Changer2 changer2) { 
     return dynamicMessage.last().map(new Func1<Message, Message>() { 
      @Override 
      public Message call(Message message) { 
       if("+".equalsIgnoreCase(message.operation)){ 
        message.value = message.value+changer2.value; 
       } else if("-".equalsIgnoreCase(message.operation)){ 
        message.value = message.value-changer2.value; 
       } 
       return message; 
      } 
     }); 
    } 
})); 

public static void main(String[] args) throws InterruptedException { 

    dynamicMessage.subscribe(new Action1<Message>() { 
     @Override 
     public void call(Message message) { 
      System.out.println(message); 
     } 
    }); 


    Thread.sleep(5000000); 
} 
} 

Так что мой ожидается outpur является:

Сообщение {значение = 1, операция = '+'}
сообщение {значение = 1, операция = '-'}
Сообщение {значение = -1, операция = '-'}
сообщение {значение = 1, операция = '+'}
Сообщение {значение = 2, операция = '+'}

Также я после того как я слиял все с CombineLatest, но потом я обнаружил, что CombineLatest не предоставляет, какой из них был изменен, и без этой информации я не буду знать, какие изменения в сообщении должны быть выполнены. Любая помощь?

ответ

1

Другими словами, вы хотите уменьшитьсканирования свои действия, чтобы отразить изменения в состоянии. Существует оператор, по праву названный reducescan, который делает именно то, о чем вы просите. Он принимает каждое событие, которое приходит, и позволяет вам преобразовать его с добавлением предыдущего результата этого преобразования, которое будет вашим состоянием.

interface Changer { 
    State apply(State state); 
} 

class ValueChanger implements Changer { 
    ... 
    State apply(State state){ 
     // implement your logic of changing state by this action and return a new one 
    } 
    ... 
} 


class OperationChanger implements Changer { 
    ... 
    State apply(State state){ 
     // implement your logic of changing state by this action and return a new one 
    } 
    ... 
} 

Observable<ValueChanger> valueChangers 
Observable<OperationChanger> operationChangers 

Observable.merge(valueChangers, operationChangers) 
    .scan(initialState, (state, changer) -> changer.apply(state)) 
    .subscribe(state -> { 
     // called every time a change happens 
    }); 

Обратите внимание, что я немного изменил свое название, чтобы иметь больше смысла. Кроме того, здесь я использую лямбда-выражения Java8. Просто замените их на анонимные классы, если вы не можете использовать их в своем проекте.

EDIT: используйте scan оператор вместо reduce как это обеспечивает каждый результат по пути, а не излучающие только конечный результат

+0

Эта функция дает правильное окончательное значение, но отсутствуют промежуточные значения. 'subscribe' fire только тогда, когда все чейнджеры закончат работу. – msangel

+0

'valueChangers = Observable.interval (1, TimeUnit.SECONDS) .map (aLong -> новый ValueChanger (1))' заставляет это зависать вечно. – msangel

+0

Прошу прощения, я не использовал сокращение через некоторое время. Однако вы можете просто поменять «уменьшить» на «сканирование» теми же параметрами, и он будет работать, как ожидалось. Я отредактирую свой ответ сразу – koperko

1

Были внесены некоторые изменения, мы надеемся, хватит ваших потребностей. Пожалуйста, взгляните и верните результаты.

static class Message { 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + "value=" + value + ", operation='" + operation 
       + '\'' + '}'; 
    } 

    public Message applyChange(Object changer){ 
     if(changer instanceof Changer1){ 
      this.operation = ((Changer1)changer).operation; 
     }else if(changer instanceof Changer2){ 
      int v2 = ((Changer2)changer).value; 
      if("+".equals(operation)){ 
       value += v2; 
      }else if("-".equals(operation)){ 
       value -= v2; 
      } 
     } 
     return this; 
    } 
} 

static class Changer1{ 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Changer1{" + "operation='" + operation + '\'' + '}'; 
    } 
} 

static class Changer2{ 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "Changer2{" + "value=" + value + '}'; 
    } 
} 

static Observable<? extends Object> changers1 = Observable 
     .just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS) 
     .concatWith(
       Observable.just(new Changer1("+")).delay(2, 
         TimeUnit.SECONDS)); 

static Observable<? extends Object> changers2 = Observable 
     .just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS) 
     .concatWith(
       Observable.just(new Changer2(2)).delay(2, TimeUnit.SECONDS)); 


static Observable<Message> dynamicMessage = Observable 
     .just(new Message(1, "+")) 
     .flatMap(m -> ((Observable<Object>)changers1).mergeWith((Observable<Object>)changers2).map(c -> m.applyChange(c))); 

public static void main(String[] args) throws InterruptedException { 
    dynamicMessage.toBlocking().subscribe(v -> System.out.println(v)); 
} 

Изменения:

Message имеет новый метод, который вызывается каждый раз, когда Changer испускается, он изменяет значение Message объекта. Это используется, потому что у вас есть только один объект Message, который на самом деле является значением init.

public Message applyChange(Object changer){ 
    if(changer instanceof Changer1){ 
     this.operation = ((Changer1)changer).operation; 
    }else if(changer instanceof Changer2){ 
     int v2 = ((Changer2)changer).value; 
     if("+".equals(operation)){ 
      value += v2; 
     }else if("-".equals(operation)){ 
      value -= v2; 
     } 
    } 
    return this; 
} 

dynamicMessage изменяется по мере необходимости -

static Observable<Message> dynamicMessage = Observable 
    .just(new Message(1, "+")) 
    .flatMap(m -> ((Observable<Object>)changers1) 
     .mergeWith((Observable<Object>)changers2) 
     .map(c -> m.applyChange(c))); 
+0

Хорошо, я пробовал это. Оно работает. Мне пришлось потратить еще 5 минут на размышления ***, как и почему? ***. Короткий вывод: он работает, но первая функция flatMap получает только одно значение, поэтому все операции в вложенных наблюдаемых выполняются физически на одном и том же объекте. Синхронизация функции 'applyChange' предотвратит из-за нарушения состояния. Второй ответ, похоже, работает, поэтому я буду отмечать как соединение нет. (Но читатели этого поставят еще один голос за принятый для них ответ). Спасибо. – msangel

0

Хорошо, у меня есть два ответа здесь. Один из них работает, но у него есть проблема с дизайном, которая заставляет контролировать одновременное выполнение руками. Другой, не работающий по дизайну (возможно, после некоторых манипуляций с созданием оператора reduce работает не для всех, но для каждой пары данных он будет работать). Но я размещаю собственный ответ здесь, потому что нашел более элегантное и правильное решение.

Перед тем, как ответить, взгляните на вопрос. Почему он висит? Ответ: потому что оператор last() блокирует. Поэтому мне нужно заставить его быть неблокирующимся. На основании другого ответа здесь: https://stackoverflow.com/a/37015509/449553, я могу использовать для этого класс BehaviorSubject.

Вот мой рабочий код:

final BehaviorSubject<Message> subject = BehaviorSubject.create(new Message(1, "+")); 
    Observable<Message> observable = Observable.<Changer>merge(changers1, changers2).map(new Func1<Changer, Message>() { 
     @Override 
     public Message call(Changer changer) { 
      Message value = subject.getValue(); 
      Message v = value.applyChange(changer); 
      subject.onNext(v); 
      return v; 
     } 
    }); 

(недостающие части кода вы найдете в других ответах here и here) Тем не менее, я не ознаменует ответ, как правильно, я считаю, кто-то может понадобиться решить эту загадку себе более правильным образом.

EDIT: Правильный способ был найден, см. Отмеченный ответ.