Я использую драйвер datastax java 3.1.0 для подключения к кластеру cassandra, а моя кластерная версия cassandra - 2.0.10. Я пишу асинхронно с консистенцией QUORUM.Как сжимать запрос на запись в cassandra при работе с «executeAsync»?
private final ExecutorService executorService = Executors.newFixedThreadPool(10);
public void save(String process, int clientid, long deviceid) {
String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)";
try {
BoundStatement bs = CacheStatement.getInstance().getStatement(sql);
bs.setConsistencyLevel(ConsistencyLevel.QUORUM);
bs.setString(0, process);
bs.setInt(1, clientid);
bs.setLong(2, deviceid);
ResultSetFuture future = session.executeAsync(bs);
Futures.addCallback(future, new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
logger.logInfo("successfully written");
}
@Override
public void onFailure(Throwable t) {
logger.logError("error= ", t);
}
}, executorService);
} catch (Exception ex) {
logger.logError("error= ", ex);
}
}
My above save method будет вызываться из нескольких потоков с очень высокой скоростью.
Вопрос:
Я хочу задушить запрос executeAsync
метод, который записывает асинхронно в Кассандре. Если я пишу на очень высокой скорости, чем может работать мой кластер Cassandra, тогда он начнет бросать ошибки, и я хочу, чтобы все мои записи успешно переходили в cassandra без каких-либо потерь.
Я видел это post, где решение должно использовать Semaphore
с фиксированным количеством разрешений. Но я не уверен, как и как лучше всего это реализовать. Раньше я никогда раньше не использовал Семафор. Это логика. Может ли кто-нибудь представить пример с базой Семафора в моем коде или если есть лучший способ/вариант, тогда дайте мне знать.
В контексте написания программы dataloader, вы могли бы сделать что-то как следующее:
- Для простоты использования семафора или какой-либо другой конструкции с фиксированным количеством разрешений (что быть вашим максимальным количеством потоков запросов). Когда вы отправляете запрос с использованием executeAsync, получите разрешение. Вам действительно понадобятся только один поток (но может потребоваться , чтобы ввести пул размером ядра # cpu, который делает это), который получает разрешения от Семафора и выполняет запросы. Это будет только блок на приобретение, пока не будет доступного разрешения.
- Используйте Futures.addCallback для будущего, возвращенного из executeAsync. Обратный вызов должен вызывать Sempahore.release() как onSuccess, так и onFailure. Отпустив разрешение, это должно позволить вашему потоку на шаге 1 продолжить и отправить следующий запрос.
Кроме того, я видел пару других post, где они говорили об использовании RingBuffer
или Guava RateLimitter
так, какой из них лучше, и я должен использовать? Ниже приведены параметры, я могу думать:
- Использование семафор
- Использование кольцевой буфер
- Использование гуавы Rate Limiter
Может кто-нибудь помочь мне пример того, как мы можем душить запрос или получить противодавление для записи cassandra и убедиться, что все записи успешно переходят в cassandra?
Как вы думаете, вы можете предоставить мне пример для очереди или буфер пример вы мне дали? Я думаю, что это подойдет мне лучше всего в моем сценарии. – john