2015-01-08 2 views
0

Я использую вложенное выполнение асинхронного запроса с Cassandra. Данные непрерывно передаются и для каждой входящей информации выполняется нижний блок из cassandra. Он отлично работает некоторое время, но затем начинает бросать много NoHostAvailableException.
Пожалуйста, помогите мне здесь. Cassandra Session Код доступа:NoHostAvailable Исключение при работе Async с драйвером Datastax Cassandra

Я использую отдельные сеансы для чтения и записи. Каждая из этих сессий соединяется с другим семенем, так как мне сказали, что это улучшит производительность.

final com.datastax.driver.core.Session readSession = CassandraManager.connect("10.22.1.144", "fr_repo", 
        "READ"); 
      final com.datastax.driver.core.Session writeSession = CassandraManager.connect("10.1.12.236", "fr_repo", 
        "WRITE"); 

CassandraManager.connect метод ниже:

public static Session connect(String ip, String keySpace,String type) { 
      PoolingOptions poolingOpts = new PoolingOptions(); 
      poolingOpts.setCoreConnectionsPerHost(HostDistance.REMOTE, 2); 
      poolingOpts.setMaxConnectionsPerHost(HostDistance.REMOTE, 400); 
      poolingOpts.setMaxSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 128); 
      poolingOpts.setMinSimultaneousRequestsPerConnectionThreshold(HostDistance.REMOTE, 2); 
      cluster = Cluster 
      .builder() 
      .withPoolingOptions(poolingOpts) 
      .addContactPoint(ip) 
      .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE) 
      .withReconnectionPolicy(new ConstantReconnectionPolicy(100L)).build(); 
      Session s = cluster.connect(keySpace); 
      return s; 
    } 

базы данных код операции:

ResultSetFuture resultSetFuture = readSession.executeAsync(selectBound.bind(fr.getHashcode())); 
        Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { 
         public void onSuccess(com.datastax.driver.core.ResultSet resultSet) { 
          try { 
           Iterator<Row> rows = resultSet.iterator(); 
           if (!rows.hasNext()) { 
            ResultSetFuture resultSetFuture = readSession.executeAsync(selectPrimaryBound 
              .bind(fr.getPrimaryKeyHashcode())); 
            Futures.addCallback(resultSetFuture, new FutureCallback<ResultSet>() { 
             public void onFailure(Throwable arg0) { 

             } 

             public void onSuccess(ResultSet arg0) { 
              Iterator<Row> rows = arg0.iterator(); 
              if (!rows.hasNext()) { 
               writeSession.executeAsync(insertBound.bind(fr.getHashcode(), fr, 
                 System.currentTimeMillis())); 
                writeSession.executeAsync(insertPrimaryBound.bind(
                  fr.getHashcode(), 
                  fr.getCombinedPrimaryKeys(), System.currentTimeMillis())); 
               produceintoQueue(new Gson().toJson(frCompleteMap)); 

              } else { 
               writeSession.executeAsync(updateBound.bind(fr, 
                 System.currentTimeMillis(), fr.getHashcode())); 
               produceintoQueue(new Gson().toJson(frCompleteMap)); 
              } 
             } 

            }); 

           } else { 
            writeSession.executeAsync(updateLastSeenBound.bind(System.currentTimeMillis(), 
             fr.getHashcode())); 

           } 
          } catch (Exception e) { 
           e.printStackTrace(); 
          } 

         } 

ответ

3

Это звучит, как вы отправляете больше запросов, чем ваш бассейн/кластер может обрабатывать. Это довольно легко сделать, когда вы никогда не ожидаете результата, как в вашем коде. Вы, по сути, просто бросаете столько запросов, сколько сможете, в конвейер без блокировки, и нет естественного обратного давления, чтобы замедлить ваше приложение, если резервное копирование пула или кластера. Поэтому, если ваш объем запросов слишком высок, в конечном итоге все хосты будут заняты резервной рабочей очередью. Вы можете использовать nodetool tpstats, чтобы посмотреть, как выглядят ваши очереди запросов на каждом узле.