2017-01-19 9 views
1

Предполагая, что у меня есть некоторый поток элементов:базы данных (SQLite) транзакции для последующих операторов с rxJava

Observable<Item> stream = ... 

Что я пытаюсь достичь?

  • Поток имеет любое количество операций. Все операции до начала транзакции должны выполняться вне транзакции
  • Как-то начать транзакцию в середине потока db.beginTransaction()
  • Все операторы после запуска транзакции должны быть запущены внутри транзакции
  • Сделки должны быть завершены в случае успешных операций db.setTransactionSuccessful
  • сделка должна быть всегда заканчивалось db.endTransaction
  • Это будет здорово иметь и фрагменты: открытые одна сделка для всех элементов в последующих операциях; открытия и закрытия транзакции для каждого элемента в потоке

//some upstream operators 
stream.doOnNext(i -> ...) 
    .map(i -> ...) 
    //somehow start transaction here 
    //operator inside transaction. All database changes will be reverted in case error 
    .doOnNext(i -> /*database ops*/) 
    .subscribe() 

PS: DB является применение областью действия экземпляра записываемой SQLiteDatabase

У меня есть решение в настоящее время. Но, может быть, у вас есть какие-то предложения относительно более чистого способа?

+0

Вы хотите отдельные транзакции для каждого из '' '' '' '' '' '' '' '' 'исхода от источника? Что такое «база данных»? Это соединение для этого конкретного элемента или некоторой глобальной переменной? Если он глобальный, транзакция также будет глобальной.Вы хотите, чтобы транзакция продолжалась до тех пор, пока все элементы не обработаны? –

+0

'database' - это глобальный экземпляр (область применения) записываемой' SQLiteDatabase'. Я хочу открыть одну транзакцию для обработки всех элементов потока. Но новая транзакция для каждого элемента также интересна. Вы упомянули, что такая сделка глобальна, она что-то ограничивает? – Beloo

+1

Глобальная транзакция выглядит странно для меня. Это означает, что одна транзакция будет включать все действия базы данных, которые происходят в вашей программе одновременно. Как следствие, это не имеет большого значения, когда вы начинаете свою транзакцию. Каждая операция в вашем конвейере будет повторяться для каждого элемента. Первый элемент должен открыть транзакцию, и * все * операции для последующих элементов будут находиться внутри этой транзакции. –

ответ

2

1) Для случая, когда все элементы обрабатываются в одной транзакции:

stream 
    .doOnSubscribe(d -> database.beginTransaction()) 
    . ... 
    .subscribe(v -> {...}, 
     e -> database.endTransaction(), 
     () -> { database.setTransactionSuccessful(); database.endTransaction(); }) 

2) для случая, когда имеется отдельная транзакция для каждого элемента:

class ItemWithTransaction { 
    Item item; 
    Connection conn; // connection associated with this item 
    boolean rollback; 
} 

stream 
    .map(i -> new ItemWithTransaction(i, openTransaction())) 
    .map(i -> i.conn.executeSql(..., i.item.prop1)) 
    . ... 
    .map(i -> { 
     if (...) i.rollback = true; // error related to this item 
     return i; 
    }) 
    . ... 
    .subscribe(i -> { 
      ... 
      if (i.rollback) i.conn.rollback(); 
      else i.conn.commit(); 
      i.conn.close(); 
     }, 
     e -> rollbackAndCloseAllOpenConnections(), 
     () -> {...}) 

Вообще я не был бы идти со вторым подходом, так как это может потребовать слишком много (неконтролируемых) одновременных подключений к базе данных.

3) Вы бы лучше реструктурировали свой код, чтобы сначала собрать всю необходимую информацию, а затем обновить базу данных за один раз в короткой транзакции. Вот так я бы сделал это:

stream 
    . ... // processing 
    .buffer(...) // collect updates all or in batches 
    .subscribe(Collection<ItemUpdate> batch -> { 
      database.beginTransaction(); 
      try { 
       ... // update multiple items 
       database.setTransactionSuccessful(); 
      } finally { 
       database.endTransaction(); 
      } 
     }, 
     e -> {...}, 
     () -> {...}); 
+0

Согласен. Первый подход лучше, но в вашей реализации соединение с базой данных открывается в 'doOnSubscribe', поэтому оно влияет на все потоковые операции, включая восходящий поток. Например, если поток запущен из наблюдаемого элемента «Retrofit», он заставляет транзакцию открываться намного дольше, чем она должна быть, поэтому я использовал для этого 'doOnNext'. – Beloo

+0

@Beloo Если в потоке есть более одного элемента, это все равно повлияет на восходящий поток. Я добавил реальное решение как номер 3 к моему ответу. –

1

Я создал трансформатор для достижения один сделки по всем пунктам потока:

/** @return transformer which starts SQLite database transaction for each downstream operators, 
* closes transaction in {@link Observable#doOnTerminate}. So transaction will be closed either on successful completion or error of stream. 
* set previously opened SQLite database transaction to completed in {@link Observable#doOnCompleted} call */ 
public <T> Observable.Transformer<T, T> inTransaction() { 
    return observable -> observable 
      .doOnNext(o -> { 
       if (!database.inTransaction()) database.beginTransaction(); 
      }) 
      .doOnCompleted(() -> { 
       if (database.inTransaction()) database.setTransactionSuccessful(); 
      }) 
      .doOnTerminate(() -> { 
       if (database.inTransaction()) database.endTransaction(); 
      }); 

И называть его:

stream 
    //start transaction here 
    .compose(inTransaction()) 
    .doOnNext(i -> /*database ops*/) 
    .subscribe() 

Обратите внимание, что я начать транзакцию в .doOnNext и проверять каждый раз, если транзакция уже началась, потому что кажется, что ее невозможно назвать только в первый раз.

+0

Это прекрасно работает для вас? Я использую 'ActiveAndroid'. @Beloo –

+0

Да, кажется, что он работает как ожидалось. с зеленой базой dao, подумал – Beloo