2016-06-22 2 views
0

Я работаю с услугой BLE GATT, которая имеет две характеристики, которые работают в тандеме. Один из них является характеристикой только для записи, где вы можете представить строковое значение в качестве запроса, а другое - только для уведомлений, в котором вы получаете ответ на запрос.Управление побочными эффектами с несколькими наблюдениями RxJava

Служба уведомлений немного медленная, и важно не переходить к следующему запросу до того, как уведомление будет прочитано, иначе ответ будет потерян.

С этой целью я использовал RxAndroidBle Observables с отдельными каналами для характеристик записи и уведомления. Третий Observable предоставляет запросы. Тем не менее, записи идут слишком быстро.

ConnectableObservable notifyObservable = 
    createNotifyObservable(NOTIFY_UUID).publish(); 

queryObserverable 
    .doOnSubscribe(notifyObservable::connect) 
    .doOnNext(query -> Log.d(TAG, "Processing query: " + query)) 
    .flatMap(query -> createWriteObservable(WRITE_UUID, query))) 
    .doOnNext(request -> Log.d(TAG, "Write initiated.")) 
    .flatMap(request -> notifyObservable) 
    .doOnNext(response -> Log.d(TAG, "Query response: " + response)); 

Так что на запуск приложения, вот что я вижу в журналах (включены метка времени, идентификатор процесса и идентификатор потока):

06-22 14:30:01.991 14085-15360 Processing query: Query1 
06-22 14:30:02.011 14085-15360 Processing query: Query2 
06-22 14:30:02.011 14085-15360 Processing query: Query3 
06-22 14:30:07.261 14085-15443 Write initiated. 
06-22 14:30:07.301 14085-15445 Write initiated 
06-22 14:30:07.321 14085-15447 Query response: Response3 
06-22 14:30:07.321 14085-15449 Write initiated. 
06-22 14:30:07.321 14085-15447 Query response: Response3 
06-22 14:30:07.351 14085-15453 Query response: Response3 

Есть ли способ с RxJava, чтобы гарантировать, что следующий запись происходит только после получения ответа?

EDIT: Задав предложенный maxConnection аргумент как flatMap вызовы, вызовы происходят в правильном порядке, но только для первого запроса от Observable. Вот журнал для этого случая:

06-22 14:39:09.841 22245-23079 Processing query: Query1 
06-22 14:39:15.131 22245-23166 Write initiated. 
06-22 14:39:15.201 22245-23169 Query response: Response1 
+0

Можете ли вы указать идентификатор потока для каждой строки журнала? –

+0

Хорошо, я добавил дополнительные данные из журналов. –

ответ

1

Попробуйте пройти maxConcurrent = 1 аргумент к вашему flatMap вызовов:

... 
.flatMap(query -> createWriteObservable(WRITE_UUID, query), 1) 
... 
.flatMap(request -> createNotifyObservable(NOTIFY_UUID), 1) 
... 
+0

Спасибо, я не знал о аргументе 'maxConcurrent'. Это встречается с частичным успехом, когда все вызывается в правильном порядке. Эта проблема теперь заключается в том, что оценивается только первый запрос из Observable. –

+0

Как выглядит ваш вывод сейчас? – yurgis

+0

Я добавил выход журнала в вопрос. –

1

Если есть ровно один ответ на каждый запрос, то вы можете упаковать ваши запросы в функция:

Observable<String> query(String query) { 
    return notifyObservable.flatMap(notifyBytesObservable -> // to be sure that when we will start writing we're already listening 
    Observable.combineLatest(// with combineLatest both Observables will be subscribed at the same time 
     notifyBytesObservable.first(), // to unsubscribe from notification after the first one 
     createWriteObservable(uuid, query), 
     { notificationBytes, writtenBytes -> notificationBytes } 
    ) 
) 
    .map(notificationBytes -> String(notificationBytes)) 
} 

И теперь вы должны иметь возможность совершать звонки, как это:

queryObserverable 
    .flatMap(queryString -> query(queryString), 1) 

Надеюсь, это вам поможет.

С наилучшими пожеланиями