Мне нужно распараллелить существующую фоновое задание, чтобы вместо последовательного использования ресурсов «x» он параллельно заканчивал работу, используя только «y» потоков (y < < x). Эта задача постоянно работает в фоновом режиме и продолжает обрабатывать некоторые ресурсы.Ждите потоков демонов для завершения итерации с использованием службы-исполнителя
код структурирована следующим образом:
class BaseBackground implements Runnable {
@Override
public void run() {
int[] resources = findResources(...);
for (int resource : resources) {
processResource(resource);
}
stopProcessing();
}
public abstract void processResource(final int resource);
public void void stopProcessing() {
// Override by subclass as needed
}
}
class ChildBackground extends BaseBackground {
@Override
public abstract void processResource(final int resource) {
// does some work here
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
}
Я изменил ChildBackground
следующим образом:
class ChildBackground extends BaseBackground {
private final BlockingQueue<Integer> resourcesToBeProcessed;
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource) {
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
// reset some counts and emit metrics
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
// does some work
}
}
}
}
Я не создающего и срывая ExecutorService каждый раз, потому что сбор мусора немного проблемы в моей службе. Хотя, я не понимаю, насколько это плохо, так как я не буду порождать более 10 потоков на каждой итерации.
Я не могу понять, как подождать, пока все ResourceProcessor
s закончит обработку ресурсов для одной итерации, чтобы я мог сбросить некоторые значения и исправить показатели в stopProcessing
. Я рассмотрел следующие варианты:
1) executorService.awaitTermination (тайм-аут). Это не будет работать, поскольку он всегда будет блокироваться до таймаута, потому что потоки ResourceProcessor
никогда не закончат работу
2) Я могу узнать количество ресурсов после findResources
и сделать его доступным для дочернего класса и каждый ResourceProcessor
увеличивает количество обработанных ресурсов. Мне нужно будет подождать, пока все ресурсы будут обработаны в stopProcessing
до сброса счетчиков. Мне нужно что-то вроде CountDownLatch, но вместо этого оно должно считаться UP
. В этом варианте будет много государственного управления, которого я не особо люблю.
3) Я мог бы обновить public abstract void processResource(final int resource)
, чтобы включить подсчет общих ресурсов и выполнить дочерний процесс, пока все потоки не обработают общие ресурсы. В этом случае также будет некоторое управление состоянием, но оно будет ограничено дочерним классом.
В любом из двух случаев, я должен будет добавить wait() & notify() логику, но я не уверен в моем подходе. Это то, что я имею:
class ChildBackground extends BaseBackground {
private static final int UNSET_TOTAL_RESOURCES = -1;
private final BlockingQueue<Integer> resourcesToBeProcessed;
private int totalResources = UNSET_TOTAL_RESOURCES;
private final AtomicInteger resourcesProcessed = new AtomicInteger(0);
public ChildBackground() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 2; ++i) {
executorService.submit(new ResourceProcessor());
}
}
@Override
public abstract void processResource(final int resource, final int totalResources) {
if (this.totalResources == UNSET_TOTAL_RESOURCES) {
this.totalResources = totalResources;
} else {
Preconditions.checkState(this.totalResources == totalResources, "Consecutive poll requests are using different total resources count, previous=%s, new=%s", this.totalResources, totalResources);
}
resourcesToBeProcessed.add(resource);
}
public void void stopProcessing() {
try {
waitForAllResourcesToBeProcessed();
} catch (InterruptedException e) {
e.printStackTrace();
}
resourcesProcessed.set(0);
totalResources = UNSET_TOTAL_RESOURCES;
// reset some counts and emit metrics
}
private void incrementProcessedResources() {
synchronized (resourcesProcessed) {
resourcesProcessed.getAndIncrement();
resourcesProcessed.notify();
}
}
private void waitForAllResourcesToBeProcessed() throws InterruptedException {
synchronized (resourcesProcessed) {
while (resourcesProcessed.get() != totalResources) {
resourcesProcessed.wait();
}
}
}
public class ResourceProcessor implements Runnable {
@Override
public void run() {
while (true) {
int nextResource = resourcesToBeProcessed.take();
try {
// does some work
} finally {
incrementProcessedResources();
}
}
}
}
}
Я не уверен, что при использовании AtomicInteger
правильный способ сделать это, и если да, то мне нужно вызвать ожидание() и уведомить(). Если я не использую wait() и notify(), мне даже не нужно выполнять все в синхронизированном блоке.
Пожалуйста, дайте мне знать ваши мысли об этом подходе, если я просто должен создать и выключить ExecutorService для каждой итерации или есть четвертый подход, который я должен преследовать.
Посмотрите на «Callables and Futures» на этой странице http://winterbe.com/posts/2015/04/07/java8-concurrency-tutorial-thread-executor-examples/ –
Фьючерсы не являются опцией потому что потоки исполнителей выполняются в замкнутом цикле, который останавливается только тогда, когда служба деактивирована. – user1071840
Будучи уведомленным о том, что задание, выполняемое исполнителем, является целью мысли фьючерсов. Конечно, вы не можете использовать, например, https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html#allOf-java.util.concurrent.CompletableFuture...- ? Вы отправляете одну партию ресурса исполнителям (накапливая 1 будущее по ресурсу), создаете будущее, которое ждет их объединения, и сделайте последнее вычисление ваших показателей. После представления этого последнего будущего вы могли бы забыть обо всех них. В реактивном стиле. – GPI