2017-02-03 20 views
0

Я пытаюсь синхронизировать доступ к объекту, хранящемуся в Ignite, используя транзакции и обнаруживая, что результаты одной транзакции часто переписывают результаты другого. Я написал на более простую версию того, что я пытаюсь сделать, как тест JUnit для облегчения тестирования:Как изолировать транзакции в Apache Ignite

import junit.framework.TestCase; 
import org.apache.ignite.Ignite; 
import org.apache.ignite.IgniteCache; 
import org.apache.ignite.Ignition; 
import org.apache.ignite.configuration.IgniteConfiguration; 
import org.apache.ignite.transactions.Transaction; 
import org.apache.ignite.transactions.TransactionConcurrency; 
import org.apache.ignite.transactions.TransactionIsolation; 

public class IgniteTransactionTest extends TestCase { 
    private Ignite ignite; 

    @Override 
    protected void setUp() throws Exception { 
     // Start up a non-client Ignite for tests 
     IgniteConfiguration config = new IgniteConfiguration(); 
     ignite = Ignition.getOrStart(config); 
    } 

    public void testTransaction() throws InterruptedException { 
     IgniteCache cache = ignite.getOrCreateCache("cache"); 
     cache.put("counter", 0); 

     Runnable r =() -> { 
      for (int i = 0; i < 1000; i++) { 
       Transaction tx = ignite.transactions().txStart(
         TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE); 

       int counter = (int) cache.get("counter"); 
       counter += 1; 
       cache.put("counter", counter); 

       try { 
        tx.commit(); 
       } catch (Exception ex) { 
        System.out.println("Commit failed"); 
        i--; 
       } 
      } 
     }; 

     Thread t1 = new Thread(r); 
     Thread t2 = new Thread(r); 

     t1.start(); 
     t2.start(); 

     t1.join(); 
     t2.join(); 

     assertEquals((int) cache.get("counter"), 2000); 
    } 

    @Override 
    protected void tearDown() throws Exception { 
     ignite.close(); 
    } 
} 

В принципе, работает два отдельных потока пытаются увеличивать счетчик, сохраненную в кэше Ignite, с уровнем изоляции SERIALIZABLE, а затем проверяет правильность значения счетчика. Согласно documentation для воспламенить сделки:

TransactionIsolation.SERIALIZABLE isolation level means that all transactions occur in a completely isolated fashion, as if all transactions in the system had executed serially, one after the other. Read access with this level happens the same way as with TransactionIsolation.REPEATABLE_READ level. However, in TransactionConcurrency.OPTIMISTIC mode, if some transactions cannot be serially isolated from each other, then one winner will be picked and the other transactions in conflict will result in TransactionOptimisticException being thrown.

Но работаешь этот тест производит

junit.framework.AssertionFailedError: expected:<1613> but was:<2000>

что указует на запись в одном потоке может чередоваться между чтением и записью в другом потоке. Кроме того, документация предполагает, что неудавшаяся транзакция должна генерировать исключение при попытке совершить, но этого никогда не происходит.

Я прошу понять, как использовать транзакции в первую очередь? Как я могу изолировать их?

ответ

3

Создается кеш без предоставления CacheConfiguration. По умолчанию был создан Atomic-кеш, и в этом кеше не поддерживаются транзакционные функции.

Вам нужно чего-л, как это:

ignite.getOrCreateCache(new CacheConfiguration<>("cache") 
    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); 

 Смежные вопросы

  • Нет связанных вопросов^_^