2017-01-20 18 views
6

Я использую драйвер datastax java 3.1.0 для подключения к кластеру cassandra, а моя кластерная версия cassandra - 2.0.10. Я пишу асинхронно с консистенцией QUORUM.Как отправить запрос в cassandra с определенной скоростью, используя Guava RateLimiter?

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 
    private final Semaphore concurrentQueries = new Semaphore(1000); 

    public void save(String process, int clientid, long deviceid) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     concurrentQueries.acquire(); 
     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      concurrentQueries.release(); 
      logger.logInfo("successfully written"); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      concurrentQueries.release(); 
      logger.logError("error= ", t); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
    } 

Мой метод сохранения будет вызываться из нескольких потоков с очень высокой скоростью. Если я пишу на очень высокой скорости, чем может работать мой кластер Cassandra, тогда он начнет бросать ошибки, и я хочу, чтобы все мои записи успешно переходили в cassandra без каких-либо потерь.

Вопрос:

Я думал использовать какое-то от очереди или буфера для епдиеих запросов (например java.util.concurrent.ArrayBlockingQueue). «Buffer full» означает, что клиенты должны ждать. Буфер также будет использоваться для повторной очереди неудачных запросов. Однако, чтобы быть более справедливыми, неудачные запросы, вероятно, должны быть поставлены перед очередью, чтобы они сначала были повторены. Также мы должны как-то справляться с ситуацией, когда очередь заполнена, и одновременно возникают новые неудавшиеся запросы. Затем однопоточный работник выбирает запросы из очереди и отправляет их в Кассандру. Поскольку он не должен делать много, маловероятно, что он станет бутылочным горлом. Этот работник может применять свои собственные ограничения по скорости, например. основанный на сроках с com.google.common.util.concurrent.RateLimiter.

Каков наилучший способ реализации этой функции очереди или буфера, которая может также применять ограничение скорости guava при записи в Cassandra или если есть лучший подход, дайте мне знать также? Я хотел написать в Cassandra с запросом 2000 в секунду (это должно быть настраиваемо, чтобы я мог играть с ним, чтобы узнать, что является оптимальной настройкой).

Как указано ниже в комментариях, если память продолжает увеличиваться, мы можем использовать Guava Cache или CLHM, чтобы сохранить старые записи, чтобы убедиться, что моя программа не исчерпала память. У нас будет около 12 ГБ памяти на коробке, и эти записи очень маленькие, поэтому я не вижу, чтобы это было проблемой.

+1

Не могли бы вы предоставить некоторую информацию об экземплярах и кластере, которые вы используете, плюс оператор create table + немного опишите шаблон доступа для этого. Какой коэффициент репликации вы используете. Обычно записи для cassandra очень быстро, даже на очень скромном кластере вы можете выйти за пределы 2000 req/s. Можете ли вы также проверить, действительно ли инструкция действительно подготовлена ​​и почему-то клиент не готовит заявление каждый раз? Какова скорость, с которой данные будут поступать без реализованной буферизации. Мое чувство кисти - это ваш кластер cassandra, возможно, потребуется немного увеличить/уменьшить –

+0

У нас есть три узла в каждом центре обработки данных с коэффициентом репликации 3. На этой таблице мы будем писать с очень высокой скоростью, а позже мы будем ее читать для некоторого автономного анализа. Да, я один раз кеширую подготовленный отчет, а затем повторно использую это подготовленное заявление. Эта установка кластера cassandra не под моим контролем, так как какая-то другая команда в нашей компании управляет этим, поэтому я хотел убедиться, что по крайней мере мой код не подводит, и мы можем писать все. – john

+0

Мы могли бы использовать другую базу данных, но поскольку мы используем эту базу данных для какой-то другой цели, мы решили использовать ее и для этой цели.Написание на очень высокой скорости убедитесь, что вы не потеряли данные, а затем прочитали эти записи для какого-либо автономного сравнения. Я просто хотел посмотреть, как эта задача в очереди будет работать по сравнению с обычной. Я также хотел реализовать это только для того, чтобы понять, как мы будем эффективно реализовывать это. – john

ответ

2

Если я пишу на очень высокой скорости, чем может работать кластер Cassandra, тогда он начнет метать ошибки, и я хочу, чтобы все мои записи успешно переходили в cassandra без каких-либо потерь.

драйвер Datastax позволяет настроить количество соединений на хост и количество одновременных запросов на соединение (see PoolingOptions settings)

Настройте эти параметры, чтобы уменьшить давление на Кассандру кластере.

 Смежные вопросы

  • Нет связанных вопросов^_^