2016-12-08 3 views
2

Я использую драйвер datastax java 3.1.0 для подключения к кластеру cassandra, а моя кластерная версия cassandra - 2.0.10. Я пишу асинхронно с консистенцией QUORUM.Эффективный способ писать асинхронно в cassandra с помощью драйвера datastax java?

public void save(final String process, final int clientid, final long deviceid) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
     } 
     }, Executors.newFixedThreadPool(10)); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
    } 

А ниже мой CacheStatement класс:

public class CacheStatement { 
    private static final Map<String, PreparedStatement> cache = 
     new ConcurrentHashMap<>(); 

    private static class Holder { 
    private static final CacheStatement INSTANCE = new CacheStatement(); 
    } 

    public static CacheStatement getInstance() { 
    return Holder.INSTANCE; 
    } 

    private CacheStatement() {} 

    public BoundStatement getStatement(String cql) { 
    Session session = CassUtils.getInstance().getSession(); 
    PreparedStatement ps = cache.get(cql); 
    // no statement cached, create one and cache it now. 
    if (ps == null) { 
     synchronized (this) { 
     ps = cache.get(cql); 
     if (ps == null) { 
      cache.put(cql, session.prepare(cql)); 
     } 
     } 
    } 
    return ps.bind(); 
    } 
} 

Мой выше save метод будет вызываться из нескольких потоков, и я думаю, что BoundStatement не поточно. Класс Btw StatementCache является безопасным потоком, как показано выше.

  • С BoundStatement не является потокобезопасным. Будет ли проблема в моем вышеприведенном коде, если я буду писать асинхронно из нескольких потоков?
  • И, во-вторых, я использую Executors.newFixedThreadPool(10) в параметре addCallback. Это нормально или возникнут проблемы? Или я должен использовать MoreExecutors.directExecutor. В чем же разница между этими двумя? И что это лучший способ для этого?

Ниже моя установка для подключения к Кассандре, используя datastax драйвер Java подключение:

Builder builder = Cluster.builder(); 
    cluster = 
     builder 
      .addContactPoints(servers.toArray(new String[servers.size()])) 
      .withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)) 
      .withPoolingOptions(poolingOptions) 
      .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)) 
      .withLoadBalancingPolicy(
       DCAwareRoundRobinPolicy 
        .builder() 
        .withLocalDc(
         !TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation() 
          .get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build()) 
      .withCredentials(username, password).build(); 
+0

Создайте новый пул потоков каждый раз, когда вызывается сохранение, вместо этого создайте статическую или экземплярную версию пула потоков и повторно используйте его. –

+0

Да, я сделал это после прочтения ниже ответа. Я объявил его окончательным в начале класса и затем использовал его. В общем, в чем разница между 'MoreExecutors.directExecutor()' vs 'threadpool'? – john

+0

@ChrisLohfink Есть ли реальная польза от использования 'MoreExecutors.directExecutor()' vs 'threadpool' в обратном вызове? Можете ли вы помочь мне понять это? – john

ответ

1

Я думаю, что вы делаете это хорошо. Вы можете немного оптимизировать, подготовив все заявления при запуске приложения, так что у вас есть все, что уже кэшировано, поэтому вы не получаете никакого повышения производительности для подготовки оператора при сохранении, и вы не блокируете что-либо в своем рабочем процессе.

BoundStatement не поточно, но PreparedStatement да, и вы возвращаете новую BoundStatement каждый раз, когда вы называете ваш getStatement. Действительно, функция .bind()PreparedStatement на самом деле является ярлыком для new BoundStatement(ps).bind(). И вы не получаете доступ к тем жеBoundStatement из нескольких потоков. Так что ваш код в порядке.

Для пула потоков вместо этого вы фактически создаете новый пул потоков для каждой функции addCallback. Это пустая трата ресурсов. Я не использую этот метод обратного вызова, и я предпочитаю управлять простым FutureResultSet, но я видел examples документацию datastax, в которой используется MoreExecutors.sameThreadExecutor() вместо MoreExecutors.directExecutor().

+0

Да, я прочитал эту статью только перед кодированием этого асинхронного материала .. 'sameThreadExecutor' устарел для' directExecutor', и поэтому я спросил о 'directExecutor' ... Каков наилучший способ и что вы рекомендуете здесь для пула потоков тогда? – john

+0

Пожалуйста, взгляните на [это] (http://stackoverflow.com/a/40794021/6570821), чтобы понять эту идею. – xmas79