2016-04-18 2 views
4

Я использую службу, которая читает сообщения от Kafka и толкает их в Cassandra.Java Threading: чрезмерное использование ЦП

Я использую многопоточную архитектуру для того же самого.

Есть, скажем, k threads, потребляющий предмет Kafka. Эти записи в очереди, заявленный как:

public static BlockingQueue<> 

В настоящее время существует целый ряд нитей, скажем n, которые пишут в Кассандре. Вот код, который делает это:

public void run(){ 
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName()); 
    while (!Thread.currentThread().isInterrupted()) { 
     Thread.yield(); 
     if (!content.isEmpty()) { 
      try { 
       JSONObject msg = content.remove(); 
       // JSON 
       for(String tableName : tableList){ 
        CassandraConnector.getSession().execute(createQuery(tableName, msg)); 
       } 
      } catch (Exception e) { 

      } 
     } 
    } 
} 

content является BlockingQueue используется для операций чтения-записи.

Я расширяю класс Thread при реализации потоковой передачи, и существует фиксированное количество потоков, которые продолжают выполнение, если не прерваны.

Проблема в том, что использование слишком большого количества CPU. Вот первая строка top команды:

PID USER  PR NI VIRT RES SHR S %CPU %MEM  TIME+ COMMAND 
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java 

Здесь выход strace на волоске этого процесса:

strace -t -p 46322 
Process 46322 attached 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
15:18:47 sched_yield()     = 0 
....and so on 

Почему я использую Thread.yield(), из-за this

Если вам нужна другая информация для отладки, сообщите мне.

Теперь вопрос в том, как минимизировать использование ЦП?

ответ

7

Вся цель BlockingQueue заключается в том, что она блокируется, когда она пуста. Таким образом, потребительские потоки (те, которые заполняются в Cassandra), не должны вручную проверять, пусты ли они.Вы можете просто сделать вызов take(), и если очередь пуста, вызов будет блокироваться, если он не будет прерван, или если есть доступный элемент.

Когда поток заблокирован, планировщик может назначить другой поток на своем месте, что избавит вас от вызова yield() и т. Д. Помните, что yield() уступит место другому потоку, только если для запуска доступен поток с приоритетом, большим или равным потоку, который дает доход.

public void run(){ 
    LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName()); 
    try { 
      JSONObject msg = content.take(); 
      // JSON 
      for(String tableName : tableList){ 
       CassandraConnector.getSession().execute(createQuery(tableName, msg)); 
      } 
    } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
    } 
} 
+0

, который это сделает! :) –

+0

Можете ли вы добавить фрагмент кода для этого. Я попробую блок в ответе @ Роберта. – vish4071

+0

Отправленный код snippet – Madhusudhan

3

С точки зрения вашего кода кажется, что ваши потребительские потоки всегда проверяют доступный контент. Таким образом, ваши потоки всегда работают и никогда не используются (ожидая, когда кто-нибудь их уведомит), поэтому ваш процессор всегда что-то делает, даже если он всегда дает поток текущий поток.

while (!Thread.currentThread().isInterrupted()) { Thread.yield(); if (!content.isEmpty()) {

Вы четко Тринг решить проблему производитель-потребитель, что многие из нас сталкиваются где-то над нашей карьеры программирования.

То, что вы сейчас делаете, заключается в том, что потребитель упреждательно постоянно проверяет, есть ли у него что-то, что нужно потреблять.

Наименьший и самый простой ресурсоемкие способ ее решения является:

  1. Есть производитель сигнал потребителю, что он произвел что-то.

Отъезд this example, поскольку он содержит простейший способ сделать это. Вы можете захотеть пересмотреть Java Concurrency in Practice для более глубокой помощи.

+1

Так что вы предлагаете сделать? – vish4071

+2

@ vish4071 Вы должны использовать 'BlockingQueue', поскольку он предназначен для использования: используйте один из методов блокировки: [' take() '] (http://docs.oracle.com/javase/8/docs/api /java/util/concurrent/BlockingQueue.html#take--) или ['poll (timeout, unit)'] (http://docs.oracle.com/javase/8/docs/api/java/util/concurrent /BlockingQueue.html#poll-long-java.util.concurrent.TimeUnit-) –

0

Как уже было описано в других ответов, которые вы выполняете активного ожидания вместо того, чтобы использовать центральную особенность вашей content BlockingQueue: ждать следующей записи и удалить его из очереди. Это делается с использованием метода take():

while (!Thread.currentThread().isInterrupted()) { 
    try { 
     JSONObject msg = content.take(); 
     for(String tableName : tableList){ 
      CassandraConnector.getSession().execute(createQuery(tableName, msg)); 
     } 
    } catch (Exception e) { 

    } 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^