Кажется, что его можно написать вычисление типа «вилка/присоединиться к» без рамок ForkJoin (см использование отзывной ниже). Сам интерфейс ForkJoin, по-видимому, не имеет разницы в производительности, но, возможно, немного более аккуратный для кодирования, я предпочитаю использовать только Callables.
Я также исправил первоначальную попытку. Похоже, что порог был слишком мал по первоначальной попытке, поэтому он был медленным, я думаю, он должен быть как минимум размером с число ядер.
Я не уверен, что если использование ForkJoinPool будет быстрее для этого использования, мне нужно будет собрать больше статистики, я не думаю, что у меня нет никаких операций, которые блокируются в течение длительного времени.
public class Main {
static class FindMaxTask extends RecursiveTask<Double> {
private int threshold;
private double[] data;
private int startIndex;
private int endIndex;
public FindMaxTask(double[] data, int startIndex, int endIndex, int threshold) {
super();
this.data = data;
this.startIndex = startIndex;
this.endIndex = endIndex;
this.threshold = threshold;
}
@Override
protected Double compute() {
int diff = (endIndex-startIndex+1);
if (diff!=(this.data.length/threshold)){
int aStartIndex = startIndex;
int aEndIndex = startIndex + (diff/2) - 1;
int bStartIndex = startIndex + (diff/2);
int bEndIndex = endIndex;
FindMaxTask f1 = new FindMaxTask(this.data,aStartIndex,aEndIndex,threshold);
f1.fork();
FindMaxTask f2 = new FindMaxTask(this.data,bStartIndex,bEndIndex,threshold);
return Math.max(f1.join(),f2.compute());
} else {
double max = Double.MIN_VALUE;
for (int i = startIndex; i <= endIndex; i++) {
double n = data[i];
if (n > max) {
max = n;
}
}
return max;
}
}
}
static class FindMax implements Callable<Double> {
private double[] data;
private int startIndex;
private int endIndex;
private int threshold;
private ExecutorService executorService;
public FindMax(double[] data, int startIndex, int endIndex, int threshold, ExecutorService executorService) {
super();
this.data = data;
this.startIndex = startIndex;
this.endIndex = endIndex;
this.executorService = executorService;
this.threshold = threshold;
}
@Override
public Double call() throws Exception {
int diff = (endIndex-startIndex+1);
if (diff!=(this.data.length/this.threshold)){
int aStartIndex = startIndex;
int aEndIndex = startIndex + (diff/2) - 1;
int bStartIndex = startIndex + (diff/2);
int bEndIndex = endIndex;
Future<Double> f1 = this.executorService.submit(new FindMax(this.data,aStartIndex,aEndIndex,this.threshold,this.executorService));
Future<Double> f2 = this.executorService.submit(new FindMax(this.data,bStartIndex,bEndIndex,this.threshold,this.executorService));
return Math.max(f1.get(), f2.get());
} else {
double max = Double.MIN_VALUE;
for (int i = startIndex; i <= endIndex; i++) {
double n = data[i];
if (n > max) {
max = n;
}
}
return max;
}
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
double[] data = new double[1024*1024*64];
for (int i=0;i<data.length;i++){
data[i] = Math.random();
}
int p = Runtime.getRuntime().availableProcessors();
int threshold = p;
int threads = p;
Instant start = null;
Instant end = null;
ExecutorService es = null;
es = Executors.newFixedThreadPool(threads);
System.out.println("1. started..");
start = Instant.now();
System.out.println("max = "+es.submit(new FindMax(data,0,data.length-1,threshold,es)).get());
end = Instant.now();
System.out.println("Callable (recrusive), with fixed pool, Find Max took ms = "+ Duration.between(start, end).toMillis());
es = new ForkJoinPool();
System.out.println("2. started..");
start = Instant.now();
System.out.println("max = "+es.submit(new FindMax(data,0,data.length-1,threshold,es)).get());
end = Instant.now();
System.out.println("Callable (recursive), with fork join pool, Find Max took ms = "+ Duration.between(start, end).toMillis());
ForkJoinPool fj = new ForkJoinPool(threads);
System.out.println("3. started..");
start = Instant.now();
System.out.println("max = "+fj.invoke(new FindMaxTask(data,0,data.length-1,threshold)));
end = Instant.now();
System.out.println("RecursiveTask (fork/join framework),with fork join pool, Find Max took ms = "+ Duration.between(start, end).toMillis());
}
}
Надлежащим образом осуществленная перешеек и властвуй должны работать. –
"Использование только Executor.workStealingPool()" - это фасад. В эталонной реализации это [просто 'ForkJoinPool'] (http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/8u40-b25/java/util/concurrent/Executors.java # Executors.newWorkStealingPool% 28% 29). Документация 'workStealingPool()' не очень исчерпывающая в отношении того, что на самом деле представляет собой «пул потоков обработки». Например, ваш код полагается на то, что в пуле будет создан 'Future', метод 'get()' поможет выполнить другие ожидающие задачи, истинные для 'ForkJoinPool', но все ли« пулы для обработки работы »поддерживают это? – Holger