2017-02-16 9 views
0

В моем проекте Android я сильно полагаюсь на RxJava2, SqlBrite (с RxJavaInterop) и SqlDelight.Query RxJava2 Db в другой предмет

У меня есть один поток RX, который должен идти бесконечно (пока мое обслуживание не прекратится), и на нем у меня есть .flatMapFunction<String, ObservableSource<Action>>.

Значение, это flatMap содержит Subject<Action>, получит String actionId, сделать некоторые (не имеет значения для вопроса) обработки на этих ActionId, и в зависимости от состояния должны запросить базу данных для Action объекта и направить его на subject

Мой первый подход должен был сделать запрос непосредственно:

Cursor c = db.query(...); 
if(c.moveFirst()) { 
    Action a = Action.SELECT_ALL_MAPPER.map(c); 
    subject.onNext(selectAll); 
} 

Но это блокирует бегущая нить, и я скорее вызвать это на своем собственном потоке, который необходимо сделать следующее:

  • запрос (должен возвращать 0 или 1 пункт)
  • если есть значение: карта для Action объекта и нажать значение для subject
  • если нет значения: прекратить/утилизировать.
  • subject не может принять окончание или ошибку. Он должен оставаться в живых для будущих событий.

Мой текущий подход следующий код:

RxJavaInterop.toV2Observable(db.createQuery(
    Action.TABLE_NAME, 
    Action.FACTORY.Select_by_id(actionId).statement) 
    .mapToOne(new Func1<Cursor, Action>() { 
     @Override public Action call(Cursor cursor) { 
      return Action.SELECT_ALL_MAPPER.map(cursor); 
     } 
    })) 
    .take(1) 
    .subscribe(new Consumer<Action>() { 
     @Override public void accept(Action action) throws Exception { 
      subject.onNext(action); 
     } 
    }); 

И хотя это, кажется, делает трюк на первое впечатление, я вижу несколько ошибок на нем:

  • я могу Не распоряжайтесь им. Даже если я получаю ссылку на объект Disposable, я не могу вызывать его изнутри Consumer<Action>, потому что он «возможно, не был инициализирован» (что я понимаю, причина в том, что все в порядке).
  • Если нет действия с данным идентификатором, наблюдаемый будет висеть там навсегда, пока не будет убита ВМ.

Так что вопрос:

Как я могу это сделать?

ответ

1

Я скорее вызвать это на своем собственном потоке

Посмотрите на RxAndroid. Это может выглядеть следующим образом:

yourRxStream 
    .flatMap(*db request here*) 
    .subscribeOn(Schedulers.io()) 
    .subcribe(subject); 

subject не может получить прекратить или ошибки. Он должен остаться в живых для событий .

переключатель Предмет с Relay:

Субъекты полезны для преодоления разрыва между не-Rx API. Тем не менее, они являются работоспособными с повреждением: когда они получают onComplete или onError, они больше не могут использоваться для перемещения данных. Это наблюдаемый контракт , и иногда это желаемое поведение. Большинство раз это не так.

Реле просто субъекты без вышеупомянутого имущества. Они позволяют легко монтировать API-интерфейсы без Rx в Rx и без беспокойства случайного запуска состояния терминала.


Наконец, для запроса, чем можно вывести 0 или 1 пункта, использовать Maybe.

+0

Реле - хорошая идея, и я проверю ее. Но 0 или 1, к сожалению, это не решает с вашим предложением. RxJava2 не принимает 'null', и он напрямую выдает NPE. Кроме того, наблюдаемый запрос БД создается библиотекой sqlbrite, которая остается открытой/обновляемой до тех пор, пока не будет удалена. Поэтому мне нужно отключить или, может быть, тайм-аут, чтобы закрыть его, поскольку мне нужно только одно одноразовое событие, а затем ждать следующего 'actionId', который будет новым запросом. Спасибо за внимание. – Budius

+0

Поскольку вы не можете отправить null, я думал о написании базовой монады, чтобы обернуть элемент или его отсутствие в класс, который можно было бы назвать «Может быть». Затем я вспомнил, что, возможно, уже существует как наблюдаемая альтернатива. Поскольку это просто идеально подходит, я обновил свой ответ. –