У меня есть приложение для чата, которое связывается с сервером nodejs (написанным мной) с помощью socket.io. Несколько человек могут общаться друг с другом по принципу «один к одному». Пользовательский интерфейс приложения похож на WhatsApp. Существуют различные Chat Threads
где Users
может обмениваться Chat Messages
. Они должны храниться на SQLite db.RxJava с SQLite для динамического обновления чата UI
Чтобы убедиться, что приложение работает, даже если оно выключено, я инициировал соединение socket.io в сервисе.
Когда приходит новое сообщение, заказ, таким образом,
- служба анализирует ответ и записывает его в БД SQLite
- Если приложение отключено, уведомление создается и показывается пользователю
- Если приложение включено, а пользователь находится в том же потоке чата, на котором было отправлено новое сообщение, необходимо обновить
recyclerView
. - В основном ощущение такое же, как WhatsApp, кроме я использую Socket.io и не XMPP
Я хочу использовать выполнять операции CRUD на SQLite
с помощью RxJava
.
связанные SQLiteHelper.Class
public List<ChatMessage> getChatMessagesForThread(String threadName){
List<ChatMessage>list = new ArrayList<>();
String query = "Some Query Here";
SQLiteDatabase db = this.getReadableDatabase();
Cursor c = db.rawQuery(query, null);
if (c.moveToFirst()){
do {
ChatMessage message = new ChatMessage();
message.setMessage(c.getString((c.getColumnIndex(CHAT_MESSAGES_KEY_MESSAGE))));
message.setChatThread(c.getInt(c.getColumnIndex(CHAT_MESSAGES_KEY_CHAT_THREAD)));
message.setUser(c.getString(c.getColumnIndex(CHAT_MESSAGES_KEY_USER)));
list.add(message);
} while (c.moveToNext());
}else {
//No such thread exists. Returns null
}
c.close();
return list;
}
Rx Java функции
public static <T> Observable<T> makeObservable(final Callable<T> func) {
return Observable.create(
new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
try {
T observed = func.call();
if (observed != null) { // to make defaultIfEmpty work
subscriber.onNext(observed);
}
//TODO: DECIDE IF THIS STAYS OR NOT !!!
//subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}
}).subscribeOn(Schedulers.io());
}
@SuppressWarnings("unchecked")
private Callable<List<ChatMessage>> getData(String threadName) {
return new Callable() {
public List<ChatMessage> call() {
return getChatMessagesForThread(String threadName);
}
};
}
public Observable<List<ChatMessage>> getDataObservable(String threadName) {
return makeObservable(getData(String threadName));
}
В ChatActivity.class
databaseHelper.getDataObservable(String currentThreadName)
.subscribe(new Action1<List<ChatMessage>>() {
@Override
public void call(List<ChatMessage> chatMessages) {
for (ChatMessage message : chatMessages){
//Add to list and update recyclerView
mAdapter.notifyDatasetChanged();
}
}
});
Проблема, вызов Rxjava выше получает выполняется только один раз, а не каждый раз, когда новый запись работа делается.
Этой функция в классе SQLiteHelper и вызывается из моей службы
public long writeMessageToDB(ChatMessage message){
ContentValues values = new ContentValues();
values.put(CHAT_MESSAGES_KEY_MESSAGE, message.getMessage());
values.put(CHAT_MESSAGES_KEY_CHAT_THREAD, message.getChatThread());
values.put(CHAT_MESSAGES_KEY_USER, message.getUser());
SQLiteDatabase db = this.getWritableDatabase();
return db.insert(CHAT_MESSAGES_TABLE_NAME, null, values);
}
Я читал о SQL Delite по площади, но хочу, чтобы понять, как получить вышеуказанную работу. Пожалуйста помоги!
Отложить создание наблюдаемого только после того, как он имеет хотя бы одного абонента. Новый наблюдаемый создается при выполнении новой подписки, а не при изменении базовых данных. Это не решает мою проблему, так как подписка на наблюдаемый будет выполняться только один раз в 'ChatActivity' –
другим способом решения этой проблемы - использовать Realm. Я использую это сейчас. Вы можете зарегистрировать обратный вызов изменений для любого запроса и уведомить пользовательский интерфейс, отправив событие в PublishSubject или EventBus –
У меня есть причины придерживаться SQLite. Также это поможет мне изучить RxJava гораздо более подробно. Спасибо, в любом случае –