0

У меня есть приложение для чата, которое связывается с сервером nodejs (написанным мной) с помощью socket.io. Несколько человек могут общаться друг с другом по принципу «один к одному». Пользовательский интерфейс приложения похож на WhatsApp. Существуют различные Chat Threads где Users может обмениваться Chat Messages. Они должны храниться на SQLite db.RxJava с SQLite для динамического обновления чата UI

Чтобы убедиться, что приложение работает, даже если оно выключено, я инициировал соединение socket.io в сервисе.

Когда приходит новое сообщение, заказ, таким образом,

  • служба анализирует ответ и записывает его в БД SQLite
  • Если приложение отключено, уведомление создается и показывается пользователю
  • Если приложение включено, а пользователь находится в том же потоке чата, на котором было отправлено новое сообщение, необходимо обновить recyclerView.
  • В основном ощущение такое же, как WhatsApp, кроме я использую Socket.io и не XMPP

Я хочу использовать выполнять операции CRUD на SQLite с помощью RxJava.

связанные SQLiteHelper.Class

public List<ChatMessage> getChatMessagesForThread(String threadName){ 
    List<ChatMessage>list = new ArrayList<>(); 
    String query = "Some Query Here"; 

    SQLiteDatabase db = this.getReadableDatabase(); 
    Cursor c = db.rawQuery(query, null); 

    if (c.moveToFirst()){ 

     do { 
      ChatMessage message = new ChatMessage(); 
      message.setMessage(c.getString((c.getColumnIndex(CHAT_MESSAGES_KEY_MESSAGE)))); 
      message.setChatThread(c.getInt(c.getColumnIndex(CHAT_MESSAGES_KEY_CHAT_THREAD))); 
      message.setUser(c.getString(c.getColumnIndex(CHAT_MESSAGES_KEY_USER))); 
      list.add(message); 

     } while (c.moveToNext()); 

    }else { 
     //No such thread exists. Returns null 
    } 
    c.close(); 
    return list; 
} 

Rx Java функции

public static <T> Observable<T> makeObservable(final Callable<T> func) { 
     return Observable.create(
       new Observable.OnSubscribe<T>() { 
        @Override 
        public void call(Subscriber<? super T> subscriber) { 
         try { 
          T observed = func.call(); 
          if (observed != null) { // to make defaultIfEmpty work 
           subscriber.onNext(observed); 
          } 
          //TODO: DECIDE IF THIS STAYS OR NOT !!! 
          //subscriber.onCompleted(); 
         } catch (Exception ex) { 
          subscriber.onError(ex); 
         } 
        } 
       }).subscribeOn(Schedulers.io()); 
    } 

@SuppressWarnings("unchecked") 
private Callable<List<ChatMessage>> getData(String threadName) { 
    return new Callable() { 
     public List<ChatMessage> call() { 
      return getChatMessagesForThread(String threadName); 
     } 
    }; 
} 


    public Observable<List<ChatMessage>> getDataObservable(String threadName) { 
     return makeObservable(getData(String threadName)); 
    } 

В ChatActivity.class

databaseHelper.getDataObservable(String currentThreadName) 
      .subscribe(new Action1<List<ChatMessage>>() { 
       @Override 
       public void call(List<ChatMessage> chatMessages) { 
        for (ChatMessage message : chatMessages){ 
         //Add to list and update recyclerView 
         mAdapter.notifyDatasetChanged(); 
        } 
       } 
      }); 

Проблема, вызов Rxjava выше получает выполняется только один раз, а не каждый раз, когда новый запись работа делается.

Этой функция в классе SQLiteHelper и вызывается из моей службы

public long writeMessageToDB(ChatMessage message){ 
    ContentValues values = new ContentValues(); 
    values.put(CHAT_MESSAGES_KEY_MESSAGE, message.getMessage()); 
    values.put(CHAT_MESSAGES_KEY_CHAT_THREAD, message.getChatThread()); 
    values.put(CHAT_MESSAGES_KEY_USER, message.getUser()); 
    SQLiteDatabase db = this.getWritableDatabase(); 
    return db.insert(CHAT_MESSAGES_TABLE_NAME, null, values); 
} 

Я читал о SQL Delite по площади, но хочу, чтобы понять, как получить вышеуказанную работу. Пожалуйста помоги!

ответ

0

Да, вы должны использовать оператор .defer(), если вы хотите, чтобы наблюдаемый был создан каждый раз.

Observable.defer(()-> Observable.just("some arg")) 
.subscribeOn(Schedulers.io)) 
.map(t -> getDataFromDb()) 
.map(v -> { 
    // handle your result 
}) 
.subscribe(..); 
+0

Отложить создание наблюдаемого только после того, как он имеет хотя бы одного абонента. Новый наблюдаемый создается при выполнении новой подписки, а не при изменении базовых данных. Это не решает мою проблему, так как подписка на наблюдаемый будет выполняться только один раз в 'ChatActivity' –

+0

другим способом решения этой проблемы - использовать Realm. Я использую это сейчас. Вы можете зарегистрировать обратный вызов изменений для любого запроса и уведомить пользовательский интерфейс, отправив событие в PublishSubject или EventBus –

+0

У меня есть причины придерживаться SQLite. Также это поможет мне изучить RxJava гораздо более подробно. Спасибо, в любом случае –

 Смежные вопросы

  • Нет связанных вопросов^_^