2013-08-30 2 views
4

Я использую инфраструктуру Java ExecutorService для отправки вызываемых задач для выполнения. Эти задачи взаимодействуют с веб-службой, и применяется тайм-аут веб-службы в течение 5 минут. Однако я видел, что в некоторых случаях тайм-аут игнорируется и поток «зависает» при вызове API, поэтому я хочу отменить все задачи, которые занимают больше времени, чем говорят, 5 минут.Лучшая практика для прерывания потоков, длина которых превышает пороговое значение

В настоящее время у меня есть список фьючерсов, и я перебираю их и вызываю future.get, пока все задачи не будут завершены. Теперь я видел, что перегруженный метод future.get принимает тайм-аут и выдает таймаут, когда задача не завершается в этом окне. Поэтому я подумал о подходе, в котором я делаю future.get() с таймаутом, и в случае TimeoutException я делаю future.cancel (true), чтобы убедиться, что эта задача прервана.

Мои главные вопросы
1. Получается ли с таймаутом лучший способ решить эту проблему?
2. Есть ли вероятность, что я жду с вызовом get на задачу, которая еще не была помещена в пул потоков (is not active worker). В этом случае я могу завершить поток, который, когда он начнет, может завершиться в течение необходимого срока?

Любые предложения будут глубоко оценены.

+0

Лучшая практика чтобы написать задания, чтобы они никогда не занимали слишком много времени, и их не нужно убивать извне. Остановка потока извне - это просто работа для нарушенной задачи. –

+0

http://stackoverflow.com/questions/16277191/executor-service-timeout-of-thread –

ответ

0

Получите с таймаутом лучший способ решить эту проблему?

Да, это нормально для get(timeout) на объекте Future, если задача, на которую указывает будущее, будет выполнена немедленно. Если задача еще не выполнена или выполняется, она будет ждать окончания таймаута и является хорошей практикой.

Есть ли вероятность того, что я жду с ГЭТ звонка по задаче , что hasnt еще помещенного на пуле потоков (разве активный работник)

Вы получаете Future объект только тогда, когда вы помещаете задачу в пул потоков, поэтому невозможно вызвать get() на задание, не помещая его в пул потоков. Да, есть вероятность, что задача еще не была занята свободным работником.

+0

Поскольку существует вероятность того, что задача не была занята свободным работником, я думаю, тогда мне нужно копать глубже возможно, индивидуальный подход - это был бы очень плохой сценарий, если бы я в конечном итоге отменил задачу, которая могла бы работать так, как ожидалось. – alf

+0

Можете ли вы сделать более ясно, что точная индивидуальная логика, которую вы хотите реализовать, чтобы я мог помочь. –

0

Подход, о котором вы говорите, в порядке. Но самое главное, прежде чем устанавливать пороговое значение таймаута, вам нужно знать, что является идеальным значением размера пула потоков и timiout для вашей среды. Проведите стресс-тестирование, которое покажет, является ли нить рабочих потоков, которые вы настроили как часть Threadpool, в порядке или нет. И это может даже уменьшить значение тайм-аута. Поэтому этот тест наиболее важен, я чувствую.

Время ожидания при получении отлично, но вы должны добавить, чтобы отменить задачу, если она выдает исключение TimeoutException. И если вы правильно выполните вышеуказанный тест и установите размер пула потоков и значение таймаута идеально, чем вам может даже не понадобиться отменить задания извне (но вы можете использовать это как резервное копирование). И да иногда при отмене задачи вы можете в конечном итоге отменить задачу, которая еще не подобрана Исполнителем.

+0

Я пробовал стресс-тестирование и определил значения для пула потоков и таймаута, которые должны охватывать мою среду. Это работает в 95% от env, но в некоторых случаях тайм-аут ws-вызовов просто не работает. Следовательно, моя попытка внешнего мониторинга и прекращения потоков, которые были активны в течение длительного времени и отказываются прекращать работу. Я согласен с тем, что решение проблемы тайм-аута ws решит мою проблему, но я боюсь, что это внешняя структура, и очень мало основополагающего механизма связи раскрывается. – alf

0

Конечно, Вы можете отменить задание с помощью

task.cancel (истина)

Это совершенно законно.Но это прервет поток, если это «RUNNING».

Если поток ожидает получения встроенной блокировки, тогда запрос «прерывания» не имеет никакого эффекта, кроме установки прерванного состояния потока. В этом случае вы не можете ничего сделать, чтобы остановить его. Для прерывания потока поток должен выходить из «заблокированного» состояния, приобретая замок, который он ожидал (что может занять более 5 минут). Это ограничение использования «встроенной блокировки».

Однако вы можете использовать явные классы блокировки для решения этой проблемы. Для этого вы можете использовать метод «lockInterruptibly» интерфейса «Lock». «lockInterruptibly» позволит потоку попытаться получить блокировку, оставаясь при этом зависящей от прерывания. Вот небольшой пример для достижения этой цели:

public void workWithExplicitLock()throws InterruptedException{ 
Lock lock = new ReentrantLock(); 
lock.lockInterruptibly()(); 
try { 
// work with shared object state 
} finally { 
lock.unlock(); 
} 

}

2
  1. ли получить тайм-ауту, лучший способ решить эту проблему?

    • Этого не достаточно. Например, если ваша задача не предназначена для ответа на прерывание, он будет продолжать работать или быть просто заблокирован
  2. Есть ли вероятность того, что я жду с самого начала звонить на задачу, которая hasnt еще была помещается в пул потоков (не является активным работником). В этом случае я могу завершить поток, который, когда он начнет, может завершиться в течение необходимого срока?

    • Да, Вы можете в конечном итоге отмена в качестве задачи, которая никогда не планируется работать, если ваш поток бассейн не настроен должным образом

Следующий фрагмент кода может быть одним из способов вы можете чтобы ваша задача реагировала на прерывание, когда ваша задача содержит блокировку без прерывания. Также он не отменяет задачу, запуск которой не запланирован. Идея заключается в том, чтобы переопределить метод прерывания и закрыть запущенные задачи, говорят закрывающие гнезда, соединения с базами данных и т.д. Этот код не является совершенным, и вам необходимо внести изменения в соответствии с требованиями, обработки исключений и т.д.

class LongRunningTask extends Thread { 
private Socket socket; 
private volatile AtomicBoolean atomicBoolean; 


public LongRunningTask() { 
    atomicBoolean = new AtomicBoolean(false); 
} 

@Override 
public void interrupt() { 
    try { 
     //clean up any resources, close connections etc. 
     socket.close(); 
    } catch(Throwable e) { 
    } finally { 
     atomicBoolean.compareAndSet(true, false); 
     //set the interupt status of executing thread. 
     super.interrupt(); 
    } 
} 

public boolean isRunning() { 
    return atomicBoolean.get(); 
} 

@Override 
public void run() { 
    atomicBoolean.compareAndSet(false, true); 
    //any long running task that might hang..for instance 
    try { 
     socket = new Socket("0.0.0.0", 5000); 
     socket.getInputStream().read(); 
    } catch (UnknownHostException e) { 
    } catch (IOException e) { 
    } finally { 

    } 
} 
} 
//your task caller thread 
//map of futures and tasks 
    Map<Future, LongRunningTask> map = new HashMap<Future, LongRunningTask>(); 
    ArrayList<Future> list = new ArrayList<Future>(); 
    int noOfSubmittedTasks = 0; 

    for(int i = 0; i < 6; i++) { 
     LongRunningTask task = new LongRunningTask(); 
     Future f = execService.submit(task); 
     map.put(f, task); 
     list.add(f); 
     noOfSubmittedTasks++; 
    } 

    while(noOfSubmittedTasks > 0) { 
     for(int i=0;i < list.size();i++) { 
      Future f = list.get(i); 
      LongRunningTask task = map.get(f); 
      if (task.isRunning()) { 
       /* 
       * This ensures that you process only those tasks which are run once 
       */ 
       try { 
        f.get(5, TimeUnit.MINUTES); 
        noOfSubmittedTasks--; 
       } catch (InterruptedException e) { 
       } catch (ExecutionException e) { 
       } catch (TimeoutException e) { 
              //this will call the overridden interrupt method 
        f.cancel(true); 
        noOfSubmittedTasks--; 
       } 
      } 

     } 
    } 
    execService.shutdown();