Мне нужно создать Observable, который будет собирать информацию из разных источников (другие наблюдаемые данные), и каждый источник влияет на значение события, но все же значение построено на основе предыдущего value (вид конечного автомата).Слияние Наблюдаемые с обратной связью, слияния Наблюдаемые с самим собой
У нас есть сообщение с INT значение и код операции:
class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
}
И какой-то источник таких значений со значением инициализации:
Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));
В настоящее время существуют источники события. Эти источники будут испускать значения, на основе которых и на основе предыдущего значения dynamicMessage появится новое значение dynamicMessage. На самом деле его типы событий:
class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
}
class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
}
Changer1 отвечает за изменение операции. Changer2 отвечает за изменение стоимости.
и источники этих значений:
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
Теперь мне нужно изменить dynamicMessage Observable и принимать в отношении сообщений от смены. вот схема:
Также я пытаюсь написал программу, как я вижу решение, но он висит с OutOfMemoryException, Rigth после первого значения:
Сообщения {значения = 1, операция = «+»}
Листинг:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxDilemma {
static class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" +
"value=" + value +
", operation='" + operation + '\'' +
'}';
}
}
static class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" +
"operation='" + operation + '\'' +
'}';
}
}
static class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" +
"value=" + value +
'}';
}
}
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer1 changer) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
message.operation = changer.operation;
return message;
}
});
}
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer2 changer2) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
if("+".equalsIgnoreCase(message.operation)){
message.value = message.value+changer2.value;
} else if("-".equalsIgnoreCase(message.operation)){
message.value = message.value-changer2.value;
}
return message;
}
});
}
}));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.subscribe(new Action1<Message>() {
@Override
public void call(Message message) {
System.out.println(message);
}
});
Thread.sleep(5000000);
}
}
Так что мой ожидается outpur является:
Сообщение {значение = 1, операция = '+'}
сообщение {значение = 1, операция = '-'}
Сообщение {значение = -1, операция = '-'}
сообщение {значение = 1, операция = '+'}
Сообщение {значение = 2, операция = '+'}
Также я после того как я слиял все с CombineLatest, но потом я обнаружил, что CombineLatest не предоставляет, какой из них был изменен, и без этой информации я не буду знать, какие изменения в сообщении должны быть выполнены. Любая помощь?
Эта функция дает правильное окончательное значение, но отсутствуют промежуточные значения. 'subscribe' fire только тогда, когда все чейнджеры закончат работу. – msangel
'valueChangers = Observable.interval (1, TimeUnit.SECONDS) .map (aLong -> новый ValueChanger (1))' заставляет это зависать вечно. – msangel
Прошу прощения, я не использовал сокращение через некоторое время. Однако вы можете просто поменять «уменьшить» на «сканирование» теми же параметрами, и он будет работать, как ожидалось. Я отредактирую свой ответ сразу – koperko