2013-04-04 2 views
3

Я хочу вставить около 50 миллионов строк (~ 30 столбцов каждый) в cassandra, в настоящее время только 1 узел.вставить большой объем данных в cassandra эффективно

Я запрашиваю данные из другого источника данных и сохраняю их в объекте таблицы. Я перебираю через синтаксический анализ каждого из строк отдельно, а затем добавляю его к мутатору. В настоящее время я вставляю 100 строк за раз, а 1 миллион строк занимает 40 минут! Как ускорить этот процесс? (Я также пробовал client.batch_mutate(), но он, похоже, сбросил ошибку соединения после нескольких тысяч вставок блоказ 2).

Через поиск вокруг я вижу, что многопоточность может помочь. Но я не мог найти примеров, может ли кто-нибудь связать меня? Спасибо !!

Мой текущий код:

 List<String> colNames = new ArrayList<String>(); 
     List<String> colValues = new ArrayList<String>(); 
     SomeTable result = Query(...); // this contains my result set of 1M rows initially 

     for (Iterator itr = result.getRecordIterator(); itr.hasNext();) { 
       String colName =..... 
       String colValue = ..... 

      int colCount = colNames.size(); // 100 * 30 

      for (int i = 0; i < colCount; i++) { 
       //add row keys and columns to mutator 
       mutator.addInsertion(String.valueOf(rowCounter), "data", HFactory.createStringColumn(colNames.get(i), colValues.get(i))); 
      } 
      rowCounter++; 

      //insert rows of block size 100 
      if (rowCounter % 100==0) { 

       mutator.execute(); 
       //clear data 
       colNames = new ArrayList<String>(); 
       colValues = new ArrayList<String>(); 
       mutator = HFactory.createMutator(keyspace, stringSerializer); 
      } 

     } 

ответ

2

Многопоточность поможет много, да. На данный момент вы используете одно соединение в Cassandra, что означает, что вы используете только один поток внутри Cassandra. Вам нужно использовать несколько соединений, для которых требуется несколько потоков в вашем клиенте.

Один из способов - использовать Java ThreadPoolExecutor и обернуть ваш mutator.execute() в runnable и выполнить его в пуле потоков. Позаботьтесь об исключениях. Вы также должны использовать BlockingQueue, чтобы ограничить количество чередующихся мутаций, если вы читаете свой источник быстрее, чем Cassandra может вставлять.

С этим установите размер пула соединений в Hector примерно на 10, и ваши вставки должны быть значительно быстрее.

На боковой ноте, если вы не знали, Cassandra не предназначен для работы с одним узлом. Я предполагаю, что вы намерены масштабировать и добавлять репликацию. Если нет, то вы, вероятно, найдете альтернативное решение более совершенным и более простым для ваших нужд. Несколько соединений и потоков становятся особенно важными при использовании нескольких узлов, поэтому скорость вставки может масштабироваться.

+0

благодарит за ваш ответ! Так что мне понадобится несколько узлов, если я хочу сделать мой клиент многопоточным? Я не знаю многопоточности, интересно, знаете ли вы о каких-либо хороших примерах нескольких кассандр в Интернете? Да, сейчас я нахожусь в тестировании и позже расширяюсь до большего количества узлов. @Richard –

+0

Нет, у вас может быть много соединений на узел, что все, что вам нужно, чтобы сделать ваш клиент многопоточным. Я не знаю примеров Cassandra, но javadoc для ThreadPoolExecutor хорош http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html и есть учебник по Java threading here http://docs.oracle.com/javase/tutorial/essential/concurrency/index.html – Richard

+0

Еще раз спасибо @Richard. Извините за другой вопрос о новичке. Похоже, я оберну свой код поверх Runnable и создаю много потоков и отправляю разные объекты «Таблица». Мой вопрос: должен ли я использовать создание нового объекта Cluster/Mutator/Keyspace для каждого из потоков или того же самого? –