3

В принципе, существует несколько различных stretegies с обработкой исключений при использовании ThreadPoolExecutor:Statefull обработка исключений с ThreadPoolExecutor

  1. Thread.setUncaughtExceptionHandler() (и Thread.getDefaultUncaughtExceptionHandler())

    Исключение завернутые в будущем, поэтому UncaughtExceptionHandler никогда не вызывается, поэтому его нельзя использовать.

  2. Установка ThreadFactory Единственная соответствующая часть - Thread.setUncaughtExceptionHandler() в созданном потоке newley. Но это не будет иметь никакого эффекта, см. Стр. 1).

  3. Переопределение ThreadPoolExecutor.afterExecute()

    protected void afterExecute(Runnable r, Throwable t) { 
        super.afterExecute(r, t); 
        if (t == null && r instanceof Future<?>) { 
         try { 
           Object result = ((Future<?>) r).get(); 
          } catch (CancellationException ce) { 
           t = ce; 
          } catch (ExecutionException ee) { 
           t = ee.getCause(); 
          } catch (InterruptedException ie) { 
           Thread.currentThread().interrupt(); // ignore/reset 
          } 
        } 
        if (t != null){ 
         logger.error("ThreadPoolExecutor.afterExecute", t); 
        } 
    } 
    

    Этот подход работает почти. Если обработка исключений является безстоящей, то вам не нужно обращаться к состоянию вашей исходной задачи Runnable/Callable, это нормально. В statefull случае у вас нет доступа к исходной задаче (даже отражение не помогает, потому что Runnable выше не будет выполнять оригинальную задачу).

Как я могу обработать исключение, когда мне нужно получить доступ к исходной задаче?

ответ

2

Прежде всего, см. Handling Exceptions for ThreadPoolExecutor для получения дополнительной информации о проблеме с afterExecute().

ThreadPoolExecutor имеет

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) ; 

и

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value); 

когда вызываемые, запускаемые ваши оригинальные задачи, которые вы можете украсить. Это основные стратегии. Ниже рабочего кода с использованием Spring (я удалил комментарий для clearity):

package org.springframework.scheduling.concurrent; 

import java.util.concurrent.BlockingQueue; 
import java.util.concurrent.Callable; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.RejectedExecutionHandler; 
import java.util.concurrent.RunnableFuture; 
import java.util.concurrent.SynchronousQueue; 
import java.util.concurrent.ThreadFactory; 
import java.util.concurrent.ThreadPoolExecutor; 
import java.util.concurrent.TimeUnit; 

import org.springframework.core.task.TaskDecorator; 
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; 
import org.springframework.scheduling.concurrent.ExecutorConfigurationSupport; 
import org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean; 
import org.springframework.util.Assert; 

public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { 

    private final Object poolSizeMonitor = new Object(); 

    private int corePoolSize = 1; 

    private int maxPoolSize = Integer.MAX_VALUE; 

    private int keepAliveSeconds = 60; 

    private int queueCapacity = Integer.MAX_VALUE; 

    private boolean allowCoreThreadTimeOut = false; 
    //fix 
    private CallableTransform callableTransform; 

    private ThreadPoolExecutor threadPoolExecutor; 


    public void setCorePoolSize(int corePoolSize) { 
     synchronized (this.poolSizeMonitor) { 
      this.corePoolSize = corePoolSize; 
      if (this.threadPoolExecutor != null) { 
       this.threadPoolExecutor.setCorePoolSize(corePoolSize); 
      } 
     } 
    } 

    public int getCorePoolSize() { 
     synchronized (this.poolSizeMonitor) { 
      return this.corePoolSize; 
     } 
    } 

    public void setMaxPoolSize(int maxPoolSize) { 
     synchronized (this.poolSizeMonitor) { 
      this.maxPoolSize = maxPoolSize; 
      if (this.threadPoolExecutor != null) { 
       this.threadPoolExecutor.setMaximumPoolSize(maxPoolSize); 
      } 
     } 
    } 

    public int getMaxPoolSize() { 
     synchronized (this.poolSizeMonitor) { 
      return this.maxPoolSize; 
     } 
    } 

    public void setKeepAliveSeconds(int keepAliveSeconds) { 
     synchronized (this.poolSizeMonitor) { 
      this.keepAliveSeconds = keepAliveSeconds; 
      if (this.threadPoolExecutor != null) { 
       this.threadPoolExecutor.setKeepAliveTime(keepAliveSeconds, TimeUnit.SECONDS); 
      } 
     } 
    } 

    public int getKeepAliveSeconds() { 
     synchronized (this.poolSizeMonitor) { 
      return this.keepAliveSeconds; 
     } 
    } 

    public void setQueueCapacity(int queueCapacity) { 
     this.queueCapacity = queueCapacity; 
    } 

    public void setAllowCoreThreadTimeOut(boolean allowCoreThreadTimeOut) { 
     this.allowCoreThreadTimeOut = allowCoreThreadTimeOut; 
    } 


    //fix 
    public void setCallableDecorator(CallableDecorator callableDecorator) { 
     Assert.isNull(this.callableTransform, "You can' call setCallableDecorator() and setTaskDecorator() more than once"); 
     this.callableTransform = new CallableTransform(){ 

      @Override 
      public Callable<?> decorate(Object originalTask) { 
       Callable<?> ret = callableDecorator.decorate((Callable<?>)originalTask); 
       return ret; 
      } 

      @Override 
      public boolean isCallable(){ 
       return true; 
      } 
    }; 
} 

    //fix 
    public void setTaskDecorator(TaskDecorator taskDecorator) { 
     Assert.isNull(this.callableTransform, "You can' call setCallableDecorator() and setTaskDecorator() more than once"); 
     this.callableTransform = new CallableTransform(){ 

      @Override 
      public Callable<?> decorate(Object originalTask) { 
       Callable<?> ret= Executors.callable(taskDecorator.decorate((Runnable)originalTask)); 
       return ret; 
      } 

      @Override 
      public boolean isCallable(){ 
       return false; 
      } 
     }; 
    } 


    @Override 
    protected ExecutorService initializeExecutor(
      ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { 

     BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); 

     ThreadPoolExecutor executor; 

     //fix 
     if (this.callableTransform != null) { 
      executor = new ThreadPoolExecutor(
        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 
        queue, threadFactory, rejectedExecutionHandler) { 

       @Override 
       protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 
        if(callableTransform==null){ 
         return super.newTaskFor(callable); 
        } 

        Callable<?> wrapedCallable = null; 

        boolean isCallable = callableTransform.isCallable(); 
        if(isCallable){ 
         wrapedCallable = callableTransform.decorate(callable); 
        } else { 
         //callableTransform accepts Runnable, but we have Callable 
         throw new IllegalStateException("You use TaskDecorator, but submit Callable"); 
        } 

        @SuppressWarnings("unchecked") 
        Callable<T> param = (Callable<T>)wrapedCallable; 
        return super.newTaskFor(param); 
       } 

       @Override 
       protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 
        if(callableTransform==null){ 
         return super.newTaskFor(runnable, value); 
        } 

        Callable<?> wrapedCallable = null; 

        boolean isRunnable = callableTransform.isRunnable(); 
        if(isRunnable){ 
         wrapedCallable = callableTransform.decorate(runnable); 
        } else { 
         //callableTransform accepts Callable, but we have Runnable 
         throw new IllegalStateException("You use CallableDecorator, but execute Runnable"); 
        } 

        @SuppressWarnings("unchecked") 
        Callable<T> param = (Callable<T>)wrapedCallable; 
        return super.newTaskFor(param); 
       } 


      }; 

     } else { 
      executor = new ThreadPoolExecutor(
        this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, 
        queue, threadFactory, rejectedExecutionHandler); 

     } 

     if (this.allowCoreThreadTimeOut) { 
      executor.allowCoreThreadTimeOut(true); 
     } 

     this.threadPoolExecutor = executor; 
     return executor; 
    } 


    protected BlockingQueue<Runnable> createQueue(int queueCapacity) { 
     if (queueCapacity > 0) { 
      return new LinkedBlockingQueue<>(queueCapacity); 
     } 
     else { 
      return new SynchronousQueue<>(); 
     } 
    } 

    public ThreadPoolExecutor getThreadPoolExecutor() throws IllegalStateException { 
     Assert.state(this.threadPoolExecutor != null, "ThreadPoolTaskExecutor not initialized"); 
     return this.threadPoolExecutor; 
    } 

    public int getPoolSize() { 
     if (this.threadPoolExecutor == null) { 
      // Not initialized yet: assume core pool size. 
      return this.corePoolSize; 
     } 
     return this.threadPoolExecutor.getPoolSize(); 
    } 

    public int getActiveCount() { 
     if (this.threadPoolExecutor == null) { 
      // Not initialized yet: assume no active threads. 
      return 0; 
     } 
     return this.threadPoolExecutor.getActiveCount(); 
    } 


    @FunctionalInterface 
    public interface CallableDecorator { 
     <V> Callable<V> decorate(Callable<V> task); 
    } 

    @FunctionalInterface 
    static interface CallableTransform { 
     Callable<?> decorate(Object originalTask); 

     default boolean isCallable(){ 
      return true; 
     } 

     default boolean isRunnable(){ 
      return !isCallable(); 
     } 
    } 



    //rest of the code execute/submit override 
    //... 

    @Override 
    public boolean prefersShortLivedTasks() { 
     return true; 
    } 

} 

А использование пример является, как foolows:

ThreadPoolTaskExecutor threadPoolFactory = new ThreadPoolTaskExecutor(); 
    threadPoolFactory.setCorePoolSize(4); 
    threadPoolFactory.setMaxPoolSize(4); 
    threadPoolFactory.setKeepAliveSeconds(0); 


    CallableDecorator decorator = new CallableDecorator(){ 

     @Override 
     public <T> Callable<T> decorate(Callable<T> task) { 
      return() -> { 
       try { 
        return task.call(); 
       } 
       catch (Throwable e) { 
        synchronized (executor) { 
         if (!((MyRunnable) task).failSilent){ //note use of state of original Task 
          log.error("Execution Failure!", e); 
         } 
        } 
        throw e; 
       } 
      }; 
     } 
    }; 
    threadPoolFactory.setCallableDecorator(decorator); 

    threadPoolFactory.initialize(); 
    executor = threadPoolFactory.getThreadPoolExecutor(); 

и далее:

executor.submit(new MyCallable(true)); 
+0

Я буду рад, если кто-то поставит этот код на весну 5. – alexsmail

1

В принципе, представляет собой пару различных стратегий с обработкой исключений при использовании ThreadPoolExecutor:

Хотя вы можете переопределить ThreadPoolExecutor.beforeExecute(...), выкопайте свою управляемую, которая находится там через отражение, установите ThreadLocal, а затем используйте ее в afterExecute(...), это действительно похоже на взломать и очень зависит от реализации TPE.

Я бы вместо этого обернул ваши методы Runnable или Callable в обертке ошибки журнала проб/ошибок.Так что вы бы добавить вещи в пул потоков с чем-то вроде:

threadPool.submit(new RunnableWrapper(myRunnable)); 
// or 
threadPool.submit(new CallableWrapper(myCallable)); 

Они будут иметь/улов/механизм регистрации TRY, а также иметь доступ к Runnable для оценки состояния. Что-то еще похоже на меня.

Вы можете наверняка переопределить методы submit(...), чтобы сделать обертку заданий самостоятельно. Это кажется намного более чистым.

+0

* Использование RunnableWrapper влияет на то, что я достигаю с помощью своего решения, я украшаю myRunnable. Разница в том, что я передаю RunnableWrapper в фазе конструктора и в любой фазе отправки. Вы также можете переопределить метод отправки (который является общедоступным) и не защищенный метод как альтернативный. Но в таком случае вы должны переопределить многие такие методы, и я думаю, что риск разрыва будущего TPE выше, но последний момент спорный. * Службы Исполнителя не подходят в моем случае. В моей обработке исключений используется состояние myRunnable. Я не могу этого добиться. – alexsmail

+1

Я изменил свой ответ @alexsmail, подумав об этом еще немного. – Gray

+0

Хорошо, учтите, что у вас есть такой метод, как invokAll(), который вы должны переопределить. Если вы пойдете в мою сторону, используя переопределить newTaskFor(), он будет «просто работать» (он использует newTaskFor() внутренне). Обертывание себя, безусловно, своего рода решение, но не так чисто, как мой подход. Кроме того, вы создаете как можно обертки как вызываемые/выполняемые объекты. Более того, я считаю, что правильная обработка исключений должна быть закодирована с помощью другого обратного вызова в время создания ThreadPool (как это должно быть сделано с ThreadFactory). – alexsmail