Я использую драйвер datastax java 3.1.0 для подключения к кластеру cassandra, а моя кластерная версия cassandra - 2.0.10. Я пишу асинхронно с консистенцией QUORUM.Эффективный способ писать асинхронно в cassandra с помощью драйвера datastax java?
public void save(final String process, final int clientid, final long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, Executors.newFixedThreadPool(10));
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
А ниже мой CacheStatement
класс:
public class CacheStatement {
private static final Map<String, PreparedStatement> cache =
new ConcurrentHashMap<>();
private static class Holder {
private static final CacheStatement INSTANCE = new CacheStatement();
}
public static CacheStatement getInstance() {
return Holder.INSTANCE;
}
private CacheStatement() {}
public BoundStatement getStatement(String cql) {
Session session = CassUtils.getInstance().getSession();
PreparedStatement ps = cache.get(cql);
// no statement cached, create one and cache it now.
if (ps == null) {
synchronized (this) {
ps = cache.get(cql);
if (ps == null) {
cache.put(cql, session.prepare(cql));
}
}
}
return ps.bind();
}
}
Мой выше save
метод будет вызываться из нескольких потоков, и я думаю, что BoundStatement
не поточно. Класс Btw StatementCache
является безопасным потоком, как показано выше.
- С
BoundStatement
не является потокобезопасным. Будет ли проблема в моем вышеприведенном коде, если я буду писать асинхронно из нескольких потоков? - И, во-вторых, я использую
Executors.newFixedThreadPool(10)
в параметреaddCallback
. Это нормально или возникнут проблемы? Или я должен использоватьMoreExecutors.directExecutor
. В чем же разница между этими двумя? И что это лучший способ для этого?
Ниже моя установка для подключения к Кассандре, используя datastax драйвер Java подключение:
Builder builder = Cluster.builder();
cluster =
builder
.addContactPoints(servers.toArray(new String[servers.size()]))
.withRetryPolicy(new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE))
.withPoolingOptions(poolingOptions)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc(
!TestUtils.isProd() ? "DC2" : TestUtils.getCurrentLocation()
.get().name().toLowerCase()).withUsedHostsPerRemoteDc(3).build())
.withCredentials(username, password).build();
Создайте новый пул потоков каждый раз, когда вызывается сохранение, вместо этого создайте статическую или экземплярную версию пула потоков и повторно используйте его. –
Да, я сделал это после прочтения ниже ответа. Я объявил его окончательным в начале класса и затем использовал его. В общем, в чем разница между 'MoreExecutors.directExecutor()' vs 'threadpool'? – john
@ChrisLohfink Есть ли реальная польза от использования 'MoreExecutors.directExecutor()' vs 'threadpool' в обратном вызове? Можете ли вы помочь мне понять это? – john