У меня есть ошибка, появившаяся дважды в производстве, когда один из моих вилочных/объединенных бассейнов перестает работать, хотя у него есть работа и больше работы добавляется.Нет рабочих потоков, когда fork-join работает?
Это заключение, к которому я пришел до сих пор, чтобы объяснить, почему очереди заданий выполняются, и поток результатов задачи останавливается. У меня есть потоки дампов, где потоки продлений моей задачи ждут завершения представления fork/join, но ни один рабочий поток ForkJoinPool не работает.
"calc-scheduling-pool-4-thread-2" #65 prio=5 os_prio=0 tid=0x00000000102e39f0 nid=0x794a in Object.wait() [0x00002ad900a06000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:334)
- locked <0x000000061ad08708> (a com.....Engine$Calculation)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:391)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:719)
at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:2613)
at com...Engine.calculateSinceLastBatch(Engine.java:141)
Независимо от того, что я делаю, это не должно произойти правильно? Дамп потока выполняется через несколько часов после обнаружения начального условия. У меня есть два других ForkJoinPools во время выполнения, оба работают нормально с множеством рабочих потоков.
Параллелизм этого пула равен 1 (я знаю, что это глупо, но не должно нарушать правильность пула fork/join). Ошибок или исключений не обнаружено, пока моя очередь задач не заполнится, а в дампе потока не обнаружен рабочий.
Кто-нибудь еще видел это? Либо я что-то упускаю, либо есть ошибка в fork/join, которая никогда (re) не запускала рабочий поток для меня.
среды выполнения Java 8
обновления с кодом
Это разумное упрощение, как мы используем вилку/нарисуйте в производстве. У нас есть три двигателя, только один из которых сконфигурирован с параллелизмом 1.
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.*;
public class Engine {
BlockingQueue<Calculation> externalQueue = new LinkedBlockingQueue<>(100000);
ScheduledExecutorService scheduling = Executors.newScheduledThreadPool(3);
static ForkJoinPool forkJoin = new ForkJoinPool(1);
public static void main(String[] args) {
new Engine().start();
}
void start() {
final AtomicInteger batch = new AtomicInteger(0);
// data comes in from external systems
scheduling.scheduleWithFixedDelay(
() -> produceData(batch.getAndIncrement()),
500,
500,
TimeUnit.MILLISECONDS);
// internal scheduling processes data with a fixed delay
scheduling.scheduleWithFixedDelay(
this::calculate,
1000,
1000,
TimeUnit.MILLISECONDS);
}
void produceData(final int batch) {
System.out.println(Thread.currentThread().getName() + " => submitting data for batch " + batch);
Stream<Integer> data = IntStream.range(0, 10).boxed();
data.map((i) -> new Calculation(batch, i)).forEach(externalQueue::offer);
}
void calculate() {
int available = externalQueue.size();
List<Calculation> tasks = new ArrayList<>(available);
externalQueue.drainTo(tasks);
// invoke will block for the results to be calculated before continuing
forkJoin.invoke(new CalculationTask(tasks, 0, tasks.size()));
System.out.println("done with calculations at " + new Date());
}
static class CalculationTask extends RecursiveAction {
static int MIN_CALCULATION_THRESHOLD = 3;
List<Calculation> tasks;
int start;
int end;
CalculationTask(List<Calculation> tasks, int start, int end) {
this.tasks = tasks;
this.start = start;
this.end = end;
}
// if below a threshold, calculate here, else fork to new CalculationTasks
@Override
protected void compute() {
int work = end - start;
if (work <= threshold()) {
for (int i = start; i < end; i++) {
Calculation calc = tasks.get(i);
calc.calculate();
}
return;
}
invokeNewActions();
}
int threshold() {
return Math.max(tasks.size()/forkJoin.getParallelism()/2, MIN_CALCULATION_THRESHOLD);
}
void invokeNewActions() {
invokeAll(
new CalculationTask(tasks, start, middle()),
new CalculationTask(tasks, middle(), end));
}
int middle() {
return (start + end)/2;
}
}
static class Calculation {
int batch;
int data;
Calculation(int batch, int data) {
this.batch = batch;
this.data = data;
}
void calculate() {
// does some work and pushes results to a listener
System.out.println(Thread.currentThread().getName() + " => calculation complete on batch " + batch
+ " for " + data);
}
}
}
Что такое очередь? Какой выпуск, Java7 или 8? Также может помочь небольшой код. – edharned
Извините, возможно, не было ясно. У меня большие задачи, идущие в очередь, которые сбрасываются с интервалами и передаются в fork/join, которые нужно разбить и выполнить. Производственная среда - это Java 8. Я могу попытаться предоставить некоторый код, но я подозреваю, что когда он сводится к тому, как мы используем fork/join, он будет похож на один из учебников. –
Вы пытались установить параллелизм больше 1? Я знаю, что это не тот ответ, который вы ищете, но без теста, чтобы воспроизвести, вряд ли кто-то будет иметь твердый ответ. Ваши предположения верны, и в прошлом были странные вещи. –