2

У меня есть ошибка, появившаяся дважды в производстве, когда один из моих вилочных/объединенных бассейнов перестает работать, хотя у него есть работа и больше работы добавляется.Нет рабочих потоков, когда fork-join работает?

Это заключение, к которому я пришел до сих пор, чтобы объяснить, почему очереди заданий выполняются, и поток результатов задачи останавливается. У меня есть потоки дампов, где потоки продлений моей задачи ждут завершения представления fork/join, но ни один рабочий поток ForkJoinPool не работает.

"calc-scheduling-pool-4-thread-2" #65 prio=5 os_prio=0 tid=0x00000000102e39f0 nid=0x794a in Object.wait() [0x00002ad900a06000] 
    java.lang.Thread.State: WAITING (on object monitor) 
    at java.lang.Object.wait(Native Method) 
    at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334) 
    - locked <0x000000061ad08708> (a com.....Engine$Calculation) 
    at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:391) 
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719) 
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:2613) 
    at com...Engine.calculateSinceLastBatch(Engine.java:141) 

Независимо от того, что я делаю, это не должно произойти правильно? Дамп потока выполняется через несколько часов после обнаружения начального условия. У меня есть два других ForkJoinPools во время выполнения, оба работают нормально с множеством рабочих потоков.

Параллелизм этого пула равен 1 (я знаю, что это глупо, но не должно нарушать правильность пула fork/join). Ошибок или исключений не обнаружено, пока моя очередь задач не заполнится, а в дампе потока не обнаружен рабочий.

Кто-нибудь еще видел это? Либо я что-то упускаю, либо есть ошибка в fork/join, которая никогда (re) не запускала рабочий поток для меня.

среды выполнения Java 8

обновления с кодом

Это разумное упрощение, как мы используем вилку/нарисуйте в производстве. У нас есть три двигателя, только один из которых сконфигурирован с параллелизмом 1.

import java.util.*; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.*; 

public class Engine { 

    BlockingQueue<Calculation> externalQueue = new LinkedBlockingQueue<>(100000); 
    ScheduledExecutorService scheduling = Executors.newScheduledThreadPool(3); 
    static ForkJoinPool forkJoin = new ForkJoinPool(1); 

    public static void main(String[] args) { 
     new Engine().start(); 
    } 

    void start() { 
     final AtomicInteger batch = new AtomicInteger(0); 
     // data comes in from external systems 
     scheduling.scheduleWithFixedDelay(
       () -> produceData(batch.getAndIncrement()), 
       500, 
       500, 
       TimeUnit.MILLISECONDS); 
     // internal scheduling processes data with a fixed delay 
     scheduling.scheduleWithFixedDelay(
       this::calculate, 
       1000, 
       1000, 
       TimeUnit.MILLISECONDS); 
    } 

    void produceData(final int batch) { 
     System.out.println(Thread.currentThread().getName() + " => submitting data for batch " + batch); 
     Stream<Integer> data = IntStream.range(0, 10).boxed(); 
     data.map((i) -> new Calculation(batch, i)).forEach(externalQueue::offer); 
    } 

    void calculate() { 
     int available = externalQueue.size(); 
     List<Calculation> tasks = new ArrayList<>(available); 
     externalQueue.drainTo(tasks); 
     // invoke will block for the results to be calculated before continuing 
     forkJoin.invoke(new CalculationTask(tasks, 0, tasks.size())); 
     System.out.println("done with calculations at " + new Date()); 
    } 

    static class CalculationTask extends RecursiveAction { 

     static int MIN_CALCULATION_THRESHOLD = 3; 

     List<Calculation> tasks; 
     int start; 
     int end; 

     CalculationTask(List<Calculation> tasks, int start, int end) { 
      this.tasks = tasks; 
      this.start = start; 
      this.end = end; 
     } 

     // if below a threshold, calculate here, else fork to new CalculationTasks 
     @Override 
     protected void compute() { 
      int work = end - start; 
      if (work <= threshold()) { 
       for (int i = start; i < end; i++) { 
        Calculation calc = tasks.get(i); 
        calc.calculate(); 
       } 
       return; 
      } 

      invokeNewActions(); 
     } 

     int threshold() { 
      return Math.max(tasks.size()/forkJoin.getParallelism()/2, MIN_CALCULATION_THRESHOLD); 
     } 

     void invokeNewActions() { 
      invokeAll(
        new CalculationTask(tasks, start, middle()), 
        new CalculationTask(tasks, middle(), end)); 
     } 

     int middle() { 
      return (start + end)/2; 
     } 
    } 

    static class Calculation { 

     int batch; 
     int data; 

     Calculation(int batch, int data) { 
      this.batch = batch; 
      this.data = data; 
     } 

     void calculate() { 
      // does some work and pushes results to a listener 
      System.out.println(Thread.currentThread().getName() + " => calculation complete on batch " + batch 
          + " for " + data); 
     } 
    } 

} 
+0

Что такое очередь? Какой выпуск, Java7 или 8? Также может помочь небольшой код. – edharned

+0

Извините, возможно, не было ясно. У меня большие задачи, идущие в очередь, которые сбрасываются с интервалами и передаются в fork/join, которые нужно разбить и выполнить. Производственная среда - это Java 8. Я могу попытаться предоставить некоторый код, но я подозреваю, что когда он сводится к тому, как мы используем fork/join, он будет похож на один из учебников. –

+2

Вы пытались установить параллелизм больше 1? Я знаю, что это не тот ответ, который вы ищете, но без теста, чтобы воспроизвести, вряд ли кто-то будет иметь твердый ответ. Ваши предположения верны, и в прошлом были странные вещи. –

ответ

0

Ожидание в java.util.concurrent.ForkJoinTask.externalAwaitDone (ForkJoinTask.java:334)

Это говорит мне что F/J может использовать вашу подающую нить в качестве рабочего. Следуйте за кодом из invokeAll. После выполнения задачи для выполнения код нуждается в будущем, и он заканчивается ((ForkJoinTask) futures.get (i)). QuietlyJoin(); спокойно Джойн идет делать.

Там, если (Thread.currentThread()) instanceof ForkJoinWorkerThread) не будет истинным, если пул использует ваш поток отправки в качестве рабочего, он попадает в внешнийAwaitDone().

Проблема может заключаться в том, что ваша подающая нить никогда не проснется, так как это не настоящий рабочий. Существует множество проблем с использованием подающего потока в качестве рабочего, и это может быть другое.

Как сказал Джон-Винт, без теста этот ответ является лишь предположением. Почему бы не установить параллелизм> 1 и не сделать с ним.