2015-09-11 3 views
2

У меня есть следующий код, чтобы найти сумму натуральных чисел от 1 до 5000. Это простое упражнение для практики параллелизма.Основная нить случайно не достигает конца (пытаясь суммировать натуральные числа в параллельных потоках)

public static void main(String[] args) throws InterruptedException { 
     final int[] threadNb = new int[] {5}; 
     final Integer[] result = new Integer[1]; 
     result[0] = 0; 
     List<Thread> threads = new LinkedList<>(); 
     IntStream.range(0, threadNb[0]).forEach(e -> { 
      threads.add(new Thread(() -> { 
       int sum = 0; 
       int idx = e * 1000 + 1; 
       while (!Thread.interrupted()) { 
        if (idx <= (e + 1) * 1000) { 
         sum += idx++; 
        } else { 
         synchronized(result) { 
          result[0] += sum; 
          System.err.println("sum found (job " + e + "); sum=" + sum + "; result[0]=" + result[0] + "; idx=" + idx); 
          Thread.currentThread().interrupt(); 
         } 
        } 
       } 
       synchronized(result) { 
        System.err.println("Job " + e + " done. threadNb = " + threadNb[0]); 
        threadNb[0]--; 
        System.err.println("threadNb = " + threadNb[0]); 
       } 
      })); 
     }); 
     threads.forEach(Thread::start); 
     //noinspection StatementWithEmptyBody 
     while(threadNb[0] > 0); 
     System.out.println("begin result"); 
     System.out.println(result[0]); 
     System.out.println("end result"); 
    } 

Иногда, когда я запускаю код, последние 3 System.out.println() не отображаются. Если я поставлю заявление в while(threadNb[0] > 0), как и еще System.out.println(), моя проблема никогда не повторится.

Может ли кто-нибудь объяснить мне такое поведение?

Заранее спасибо за любую помощь

ответ

3

ничего о том, как объявляется переменная threadNb сообщает JVM, что ему необходимо сделать обновления для него видимым для других потоков. В какой момент обновления вашей переменной становятся видимыми для других потоков, полностью зависит от реализации JVM, она может сделать их видимыми или нет в зависимости от обстоятельств. Кроме того, JIT может свободно переупорядочивать или оптимизировать код, если он думает, что ему это удастся, и оно основывает свои решения на правилах видимости. Поэтому сложно сказать, что здесь происходит, потому что поведение остается неопределенным спецификациями языка Java, но определенно у вас возникла проблема, когда обновления ваших рабочих потоков обычно не видны основным потоком.

Если вы замените массивы AtomicInteger, то обновления гарантированно станут видимыми для других потоков. (Volatile работает тоже, но AtomicInteger предпочтителен. Чтобы использовать volatile, вы должны сделать переменную экземпляр или член класса.) Если вы сохраните обновленные значения в локальной переменной, вам не потребуется синхронизация:

import java.util.*; 
import java.util.stream.*; 
import java.util.concurrent.atomic.*;  
public class SumNumbers { 
    public static void main(String[] args) throws InterruptedException { 
     AtomicInteger threadNb = new AtomicInteger(5); 
     AtomicInteger result = new AtomicInteger(0); 
     List<Thread> threads = new LinkedList<>(); 
     IntStream.range(0, threadNb.intValue()).forEach(e -> { 
      threads.add(new Thread(() -> { 
       int sum = 0; 
       int idx = e * 1000 + 1; 
       while (!Thread.currentThread().isInterrupted()) { 
        if (idx <= (e + 1) * 1000) { 
         sum += idx++; 
        } else { 
         int r = result.addAndGet(sum); 
         System.out.println("sum found (job " + e + "); sum=" 
         + sum + "; result=" + r 
         + "; idx=" + idx); 
         Thread.currentThread().interrupt(); 
        } 
       } 
       System.out.println("Job " + e + " done."); 
       int threadNbVal = threadNb.decrementAndGet(); 
       System.out.println("Job " + e + " done, threadNb = " + threadNbVal); 
      })); 
     }); 
     threads.forEach(Thread::start); 
     //noinspection StatementWithEmptyBody 
     while(threadNb.intValue() > 0); 
     System.out.println("result=" + result.intValue()); 
    } 
} 

, где вы можете видеть обновления, становятся видимыми.

Занятое ожидание не является предпочтительным, так как оно отнимает процессорные циклы. Вы видите это в незаблокированном программировании, но здесь это нехорошо. Thread # join будет работать, или вы можете использовать CountdownLatch.

Помните, что использование Thread#interrupted() устраняет прерванный флаг. Обычно он используется, когда вы собираетесь бросить InterruptedException, в противном случае лучше использовать Thread.currentThread().isInterrupted(). Это не повредит ничего в этом конкретном случае, потому что тест цикла while - это единственное, что используется с флагом, поэтому вопрос о том, очищается ли он, не имеет значения.

2

Наиболее вероятным объяснением является то, что компилятор оптимизирует код таким образом, что значение threadNb[0] кэшируется. Поэтому основной поток может не видеть обновления, сделанные другими потоками. Создание вашего счетчика volatile может помочь решить эту проблему.

Однако текущий подход с оживленным ожиданием, как правило, не является лучшим решением. Вы должны использовать метод join() класса Thread, чтобы ваш основной поток подождал, пока каждый из них не закончится.

Например:

for(Thread t: threads) { 
    try{ 
     t.join(); 
    } catch(InterruptedException e) {} 
}