10

Я использую драйвер datastax java 3.1.0 для подключения к кластеру cassandra, а моя кластерная версия cassandra - 2.0.10. Я пишу асинхронно с консистенцией QUORUM.Как сжимать запрос на запись в cassandra при работе с «executeAsync»?

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 

    public void save(String process, int clientid, long deviceid) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
    } 

My above save method будет вызываться из нескольких потоков с очень высокой скоростью.

Вопрос:

Я хочу задушить запрос executeAsync метод, который записывает асинхронно в Кассандре. Если я пишу на очень высокой скорости, чем может работать мой кластер Cassandra, тогда он начнет бросать ошибки, и я хочу, чтобы все мои записи успешно переходили в cassandra без каких-либо потерь.

Я видел это post, где решение должно использовать Semaphore с фиксированным количеством разрешений. Но я не уверен, как и как лучше всего это реализовать. Раньше я никогда раньше не использовал Семафор. Это логика. Может ли кто-нибудь представить пример с базой Семафора в моем коде или если есть лучший способ/вариант, тогда дайте мне знать.

В контексте написания программы dataloader, вы могли бы сделать что-то как следующее:

  • Для простоты использования семафора или какой-либо другой конструкции с фиксированным количеством разрешений (что быть вашим максимальным количеством потоков запросов). Когда вы отправляете запрос с использованием executeAsync, получите разрешение. Вам действительно понадобятся только один поток (но может потребоваться , чтобы ввести пул размером ядра # cpu, который делает это), который получает разрешения от Семафора и выполняет запросы. Это будет только блок на приобретение, пока не будет доступного разрешения.
  • Используйте Futures.addCallback для будущего, возвращенного из executeAsync. Обратный вызов должен вызывать Sempahore.release() как onSuccess, так и onFailure. Отпустив разрешение, это должно позволить вашему потоку на шаге 1 продолжить и отправить следующий запрос.

Кроме того, я видел пару других post, где они говорили об использовании RingBuffer или Guava RateLimitter так, какой из них лучше, и я должен использовать? Ниже приведены параметры, я могу думать:

  • Использование семафор
  • Использование кольцевой буфер
  • Использование гуавы Rate Limiter

Может кто-нибудь помочь мне пример того, как мы можем душить запрос или получить противодавление для записи cassandra и убедиться, что все записи успешно переходят в cassandra?

ответ

4

Не авторитетный ответ, но, возможно, это было бы полезно. Сначала вы должны рассмотреть, что бы вы сделали, если запрос не может быть выполнен сразу. Независимо от того, какой тарифный лимит вы выбрали, если вы получите запросы по более высокой ставке, чем вы можете написать в Cassandra, в конечном итоге вы получите свой процесс, забитый ожиданиями.И в этот момент вам нужно будет сказать своим клиентам некоторое время отложить свои запросы («отбросить назад»). Например. если они поступают через HTTP, тогда статус ответа будет 429 «Слишком много запросов». Если вы генерируете запросы в одном и том же процессе, тогда выберите, какой самый длинный тайм-аут будет приемлемым. Тем не менее, если Кассандра не сможет справиться с этим, пришло время масштабировать (или настраивать) его.

Возможно, перед тем, как внедрить ограничения скорости, стоит поэкспериментировать и добавить искусственные задержки в ваших потоках перед вызовом метода save (используя Thread.sleep (...)) и посмотреть, решает ли он вашу проблему или что-то еще необходимо.

Ошибка возврата запроса is противодавление от Cassandra. Но вы можете выбрать или реализовать RetryPolicy, чтобы определить, когда повторить неудавшиеся запросы.

Также вы можете посмотреть connection pool options (и особенно Monitoring and tuning the pool). Можно настроить число асинхронных requests per connection. Однако документация говорит, что для Кассандры 2.x этот параметр патрубки 128 и один не должен изменить его (я бы поэкспериментировать с ним, хотя :)

Реализация с семафором выглядит

/* Share it among all threads or associate with a thread for per-thread limits 
    Number of permits is to be tuned depending on acceptable load. 
*/ 
final Semaphore queryPermits = new Semaphore(20); 


public void save(String process, int clientid, long deviceid) { 
    .... 
    queryPermits.acquire(); // Blocks until a permit is available 

    ResultSetFuture future = session.executeAsync(bs); 
    Futures.addCallback(future, new FutureCallback<ResultSet>() { 
    @Override 
    public void onSuccess(ResultSet result) { 
     queryPermits.release(); 
     logger.logInfo("successfully written"); 
    } 
    @Override 
    public void onFailure(Throwable t) { 
     queryPermits.release(); // Permit should be released in all cases. 
     logger.logError("error= ", t); 
    } 
    }, executorService); 
    .... 
} 

(В реальном коде Я бы создал обратный вызов обертки, который выдавал бы разрешения, а затем вызывал обернутые методы)

Guava RateLimiter похож на семафор, но позволяет временные всплески после периодов недоиспользования и ограничивает запросы на основе хронометража (не общее количество активных запросов).

Однако запросы по каким-либо причинам не сработают, поэтому, вероятно, лучше иметь план, как их повторять (в случае прерывистых ошибок).

Возможно, это не подходит в вашем случае, но я попытаюсь использовать некоторую очередь или буфер для запроса очереди (например, java.util.concurrent.ArrayBlockingQueue). «Buffer full» означает, что клиенты должны ждать или отказаться от запроса. Буфер также будет использоваться для повторной очереди неудачных запросов. Однако, чтобы быть более справедливыми, неудачные запросы, вероятно, должны быть поставлены перед очередью, чтобы они сначала были повторены. Также нужно как-то справляться с ситуацией, когда очередь заполнена, и одновременно появляются новые неудавшиеся запросы. Затем однопоточный работник выбирает очередь запросов и отправляет их в Кассандру. Поскольку он не должен делать много, маловероятно, что он станет бутылочным горлом. Этот работник может также применять свои собственные ограничения по скорости, например. основанный на сроках с com.google.common.util.concurrent.RateLimiter.

Если вы хотите избежать потери сообщений в максимально возможной степени, он может поставить брокера сообщений с настойчивостью (например, Кафкой) перед Кассандрой. Таким образом, входящие сообщения могут выдержать даже длительные перебои в Кассандре. Но, наверное, в вашем случае это слишком много.

+0

Как вы думаете, вы можете предоставить мне пример для очереди или буфер пример вы мне дали? Я думаю, что это подойдет мне лучше всего в моем сценарии. – john

1

Просто использование блокирующей очереди должно сделать это хорошо. Фьючерсы имеют резьбу, и там обратный вызов (успех и отказ) будет действовать как потребитель, и везде, где вы вызываете метод сохранения, будет действовать как производитель.

Еще лучше, вы ставите полный запрос в очередь и выгружаете его один за другим, сохраняя при каждом удалении.

private final ExecutorService executorService = Executors.newFixedThreadPool(10); 

public void save(String process, int clientid, long deviceid, BlockingQueue<Object> queue) { 
    String sql = "insert into storage (process, clientid, deviceid) values (?, ?, ?)"; 
    try { 
     BoundStatement bs = CacheStatement.getInstance().getStatement(sql); 
     bs.setConsistencyLevel(ConsistencyLevel.QUORUM); 
     bs.setString(0, process); 
     bs.setInt(1, clientid); 
     bs.setLong(2, deviceid); 

     ResultSetFuture future = session.executeAsync(bs); 
     Futures.addCallback(future, new FutureCallback<ResultSet>() { 
     @Override 
     public void onSuccess(ResultSet result) { 
      logger.logInfo("successfully written"); 
      queue.take(); 
     } 

     @Override 
     public void onFailure(Throwable t) { 
      logger.logError("error= ", t); 
      queue.take(); 
     } 
     }, executorService); 
    } catch (Exception ex) { 
     logger.logError("error= ", ex); 
    } 
} 

public void invokeSaveInLoop(){ 
    Object dummyObj = new Object(); 
    BlockingQueue<Object> queue = new ArrayBlockingQueue<>(20);; 
    for(int i=0; i< 1000; i++){ 
     save("process", clientid, deviceid, queue); 
     queue.put(dummyObj); 
    } 
} 

Если вы хотите пойти дальше и проверить нагрузку на кластерной середине пути

public static String getCurrentState(){  
StringBuilder response = new StringBuilder(); 
      response.append("Current Database Connection Status <br>\n ---------------------------------------------<br>\n"); 
      final LoadBalancingPolicy loadBalancingPolicy = 
        cluster.getConfiguration().getPolicies().getLoadBalancingPolicy(); 
      final PoolingOptions poolingOptions = 
        cluster.getConfiguration().getPoolingOptions(); 
      Session.State state = session.getState(); 
      for (Host host : state.getConnectedHosts()) { 
       HostDistance distance = loadBalancingPolicy.distance(host); 
       int connections = state.getOpenConnections(host); 
       int inFlightQueries = state.getInFlightQueries(host); 
       response.append(String.format("%s current connections=%d, max allowed connections=%d, current load=%d, max load=%d%n", 
           host, connections, poolingOptions.getMaxConnectionsPerHost(distance), inFlightQueries, 
           connections * 
             poolingOptions.getMaxRequestsPerConnection(distance))) 
         .append("<br>\n"); 
      } 
      return response.toString(); 
}