2016-06-19 2 views
2

Я работаю над некоторым устаревшим кодом, который я не могу реорганизовать немедленно.Использование опроса с isDone и отмена на Java Будущее вместо блокировки get

В этом коде используется блокировка Java Future. Он использует future.get (withTimeOut, ...). Это означает, что нам нужно, чтобы пул потоков приличного размера был достаточно отзывчивым. Поскольку звонки будут заблокированы, пока они не закончатся или не выйдут из игры.

Вопрос: Я собирался захватить будущее и поместить его в структуру данных, которая будет знать о начале выполнения задачи. Затем создайте выделенный поток или пул, который будет циклически перебирать структуру данных и проверяет, превышено ли future.isDone или превышено лимит времени ожидания. Если да, он может либо получить результат, либо отменить выполнение. Таким образом, потребуется не много потоков. Будет ли это правильной реализацией или вообще не рекомендуется?

Заранее спасибо.

Edit:

Просто, чтобы обеспечить больше контекста. Эти потоки используются для входа в службу нисходящего потока. Мы действительно не заботимся об ответе, но мы не хотим, чтобы соединение зависло. Таким образом, нам нужно захватить будущее и обеспечить его отмену или отсрочку.

Вот базовое моделирование, которое я написал после того, как задал вопрос.

@Component 
public class PollingService { 

    private ExecutorService executorService = Executors.newFixedThreadPool(1); 
    PoorMultiplexer poorMultiplexer = new PoorMultiplexer(); 
    private ConcurrentMap<Integer, Map<Future, Long>> futures = new ConcurrentHashMap<>(); 

    public void startHandler(){ 
     Thread handler = new Thread(new Runnable() { 
      @Override 
      public void run() { 
       while(true){ 
        try { 
         //This should be handled better. If there is not anything stop and re-start it later. 
         Thread.sleep(200); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
        for(Iterator<ConcurrentMap.Entry<Integer, Map<Future, Long>>> it = futures.entrySet().iterator(); it.hasNext();){ 
         ConcurrentMap.Entry<Integer, Map<Future, Long>> entry = it.next(); 
         Map<Future, Long> futureMap = entry.getValue(); 
         boolean isProcessed = false; 
         if(futureMap.keySet().iterator().next().isDone()){ 
          //mark completed 
          isProcessed = true; 
         } 

         if(futureMap.values().iterator().next() < (300 + System.currentTimeMillis()) && !isProcessed){ 
          //cancel 
          futureMap.keySet().iterator().next().cancel(true); 
          isProcessed = true; 
         } 

         if(isProcessed){ 
          futures.remove(entry.getKey()); 
          System.out.println("Completed : " + entry.getKey()); 
         } 

        } 

        System.out.println("Run completed"); 
       } 
      } 
     }); 

     handler.start(); 
    } 

    public void run(int i) throws InterruptedException, ExecutionException{ 
     System.out.println("Starting : " + i); 

     poorMultiplexer.send(new Runnable() { 
      @Override 
      public void run() { 
       long startTime = System.currentTimeMillis(); 
       Future future = poorMultiplexer.send(execute()); 

       Map<Future, Long> entry = new HashMap<>(); 
       entry.put(future, startTime); 
       futures.put(i, entry); 
       System.out.println("Added : " + i); 
      } 
     }); 
    } 

    public void stop(){ 
     executorService.shutdown(); 
    } 

    public Runnable execute(){ 
     Worker worker = new Worker(); 
     return worker; 
    } 
} 


//This is a placeholder for a framework 
class PoorMultiplexer { 
    private ExecutorService executorService = Executors.newFixedThreadPool(20); 

    public Future send(Runnable task){ 
     return executorService.submit(task); 
    } 
} 


class Worker implements Runnable{ 

    @Override 
    public void run() { 
     //service call here 
    } 

} 
+0

Я думаю, что ваш вопрос слишком широк; при запросе проектной помощи ответы, как правило, «мнения»; так как любое решение имеет свои плюсы и минусы. И быть действительно точным: мы не можем сказать, будет ли ваша реализация корректной **. От 10000 футов выше ваша идея звучит разумно; но опять же: у нас мало деталей. Возможно, было бы лучше набросать немного UML ... и затем поговорить с вашими коллегами, которые знают приложение и получают их вход. Просить других людей, которые действительно не знают ваш код, о том, какое изменение имеет смысл ... просто не может привести к большим энсерам. – GhostCat

+0

@ Jägermeister Я чувствую, что это общая проблема с блокировкой вызовов. Я внедрил образец после запроса здесь, он работает, но требует дополнительной обработки. Вы правы, я должен был предоставить дополнительную информацию. –

ответ

1

Асинхронного опрос набор фьючерсов с помощью отдельного потока звучит как разумное осуществление для меня. Тем не менее, если вы можете добавить зависимость от библиотеки, вам может быть проще переключиться на Guava ListenableFuture, так как Guava предоставляет множество утилит для выполнения асинхронной работы.

+0

Да, я прочитал некоторые из этих вещей, но, к сожалению, их будет сложно добавить. Спасибо за ваш ответ. –