Я использую вложенное выполнение асинхронного запроса с Cassandra. Данные непрерывно передаются и для каждой входящей информации выполняется нижний блок из cassandra
. Он отлично работает некоторое время, но затем начинает бросать много NoHostAvailableException
.
Пожалуйста, помогите мне здесь. Cassandra Session Код доступа:NoHostAvailable Исключение при работе Async с драйвером Datastax Cassandra
Я использую отдельные сеансы для чтения и записи. Каждая из этих сессий соединяется с другим семенем, так как мне сказали, что это улучшит производительность.
final com.datastax.driver.core.Session readSession = CassandraManager.connect("10.22.1.144", "fr_repo",
"READ");
final com.datastax.driver.core.Session writeSession = CassandraManager.connect("10.1.12.236", "fr_repo",
"WRITE");
CassandraManager.connect
метод ниже:
public static Session connect(String ip, String keySpace,String type) {
PoolingOptions poolingOpts = new PoolingOptions();
poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, 2);
poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, 400);
poolingOpts.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 128);
poolingOpts.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 2);
cluster = Cluster
.builder()
.withPoolingOptions(poolingOpts)
.addContactPoint(ip)
.withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
.withReconnectionPolicy(new ConstantReconnectionPolicy(100L)).build();
Session s = cluster.connect(keySpace);
return s;
}
базы данных код операции:
ResultSetFuture resultSetFuture = readSession.executeAsync(selectBound.bind(fr.getHashcode()));
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
public void onSuccess(com.datastax.driver.core.ResultSet resultSet) {
try {
Iterator<Row> rows = resultSet.iterator();
if (!rows.hasNext()) {
ResultSetFuture resultSetFuture = readSession.executeAsync(selectPrimaryBound
.bind(fr.getPrimaryKeyHashcode()));
Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() {
public void onFailure(Throwable arg0) {
}
public void onSuccess(ResultSet arg0) {
Iterator<Row> rows = arg0.iterator();
if (!rows.hasNext()) {
writeSession.executeAsync(insertBound.bind(fr.getHashcode(), fr,
System.currentTimeMillis()));
writeSession.executeAsync(insertPrimaryBound.bind(
fr.getHashcode(),
fr.getCombinedPrimaryKeys(), System.currentTimeMillis()));
produceintoQueue(new Gson().toJson(frCompleteMap));
} else {
writeSession.executeAsync(updateBound.bind(fr,
System.currentTimeMillis(), fr.getHashcode()));
produceintoQueue(new Gson().toJson(frCompleteMap));
}
}
});
} else {
writeSession.executeAsync(updateLastSeenBound.bind(System.currentTimeMillis(),
fr.getHashcode()));
}
} catch (Exception e) {
e.printStackTrace();
}
}