2016-02-23 3 views
3

У меня есть очередь запущенных потоков и вы хотите выставить некоторые из ее данных во время ее выполнения, чтобы отслеживать процесс.Как получить доступ к текущим потокам внутри ThreadPoolExecutor?

ThreadPoolExecutor обеспечивает доступ к своей очереди, и я могу перебирать эти объекты, чтобы вызвать мой переопределенный метод toString(), но это только те потоки, которые ждут выполнения.

Есть ли способ получить доступ к тем, которые в настоящее время работают для вызова моего метода? Или, может быть, есть лучший подход к этой задаче в целом?

Чтобы уточнить немного больше о цели, вот некоторый код общей идеи:

public class GetDataTask implements Runnable { 
    private String pageNumber; 
    private int dataBlocksParsed; 
    private String source; 
    private String dataType; 


    public GetDataTask(String source, String dataType) { 
     this.source = source; 
     this.dataType = dataType; 
    } 

    @Override 
    public void run() { 
     //do stuff that affects pageNumber and dataBlocksParsed 
    } 

    @Override 
    public String toString() { 
     return "GetDataTask{" + 
      "source=" + source + 
      ", dataType=" + dataType + 
      ", pageNumber=" + pageNumber + 
      ", dataBlocksParsed=" + dataBlocksParsed + 
      '}'; 
    } 
} 

и класс проведения исполнителя:

public class DataParseManager { 
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); 

    public void addParseDataTask(String source, String dataType) { 
     executor.execute(new GetDataTask(source, dataType)); 
    } 

    // here's the method that I need 
    public String getInfo() { 
     StringBuilder info = new StringBuilder(); 
     //and here's the method that I'm missing - executor.getActiveThreads() 
     for (Runnable r : executor.getActiveThreads()) { 
      info.append(((GetDataTask) r).toString()).append('\n'); 
     } 
     return info.append(executor.toString()).toString(); 
    } 
} 
+0

Тема или задача? Есть разница! Из любой исполняющей задачи вы можете использовать 'Thread.currentThread', чтобы перейти к Thread, который его выполняет, и получить информацию. И вы можете, конечно, сохранить ссылки на все представленные задачи, чтобы извлекать из них информацию. – Fildor

+0

Почему у вас есть очередь выполнения потоков? Вы имеете в виду, что у вас есть пул потоков? Если вы хотите следить за тем, что делают ваши задачи, я предлагаю вам периодически обновлять некоторые задания о том, что они делают, чтобы вы могли контролировать это. –

+0

О, только что понял, я, вероятно, неправильно понял вопрос. Вы уже используете ExecutorService? То, что я написал выше, имеет смысл только тогда. – Fildor

ответ

2

Как насчет обертывания Пробег как этот.

static class MonitorRunnable implements Runnable { 

    static final List<Runnable> activeTasks = Collections.synchronizedList(new ArrayList<>()); 

    private final Runnable runnable; 

    public MonitorRunnable(Runnable runnable) { 
     this.runnable = runnable; 
    } 

    @Override 
    public void run() { 
     activeTasks.add(runnable); 
     runnable.run(); 
     activeTasks.remove(runnable); 
    } 
} 

и

public class DataParseManager { 
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); 

    public void addParseDataTask(String source, String dataType) { 
     executor.execute(new MonitorRunnable(new GetDataTask(source, dataType))); 
    } 

    // here's the method that I need 
    public String getInfo() { 
     StringBuilder info = new StringBuilder(); 
     //and here's the method that I'm missing - executor.getActiveThreads() 
     synchronized (MonitorRunnable.activeTasks) { 
      for (Runnable r : MonitorRunnable.activeTasks) { 
       info.append(((GetDataTask) r).toString()).append('\n'); 
      } 
     } 
     return info.append(executor.toString()).toString(); 
    } 
} 
+0

Не получил это сначала, но теперь я вижу. Интересный подход, я попробую. Спасибо! –

+0

Итак, я закончил с этим вариантом, потому что он выглядел быстрее, чтобы применить к моему существующему коду и почувствовал себя немного более элегантным. Пока работает хорошо. Также добавлено 'toString()' to 'MonitorRunnable', чтобы увидеть данные задачи из очереди пула. Спасибо всем! И особое спасибо @Fildor за дополнительные усилия по этому поводу. :) –

2

Всякий раз, когда вы добавляете поток в очередь, также добавьте его во вторую структуру данных, скажем, HashSet. Затем, если вам нужно получить доступ к работающему потоку, вы можете проверить очередь ExecutorService, чтобы найти Threads, которые все еще ожидают выполнения: каждый поток в вашем HashSet, который еще не находится в очереди ExecutorService, запущен.

+0

Томас, спасибо за предложение. Я редактировал оригинальный вопрос, но ваш подход должен работать в любом случае. За исключением того, что я не уверен, как правильно управлять законченными потоками. И мне нужно будет удалить его вручную из набора. Но если нет более элегантного способа ... –

+0

@and_rew Темы ThreadPoolExecutor не заканчиваются так, как при создании Thread и запускать его. Они будут повторно использованы. То, что закончится, будет исполняемым или вызываемым, которое вы отправляете в очередь для выполнения. – Fildor

+0

Вам не обязательно «слушать» темы для завершения. Вместо этого, когда вы хотите получить доступ к работающим потокам, выполните итерацию через «HashSet», игнорируйте те, которые находятся в очереди ExecutorService, а также пропустите те (и удалите их), которые завершены. – Thomas

1

Как я писал в комментарии. Я хотел бы сделать активное обновление на общедоступный объект статистики подход:

Я изменил бы задачу так:

public class GetDataTask implements Runnable { 
    private String pageNumber; 
    private int dataBlocksParsed; 
    private String source; 
    private String dataType; 
    HashMap<GetDataTask,String> statistics 


    public GetDataTask(String source, String dataType, HashMap<GetDataTask,String> statistics) { 
     this.source = source; 
     this.dataType = dataType; 
     this.statistics = statistics; 
    } 

    @Override 
    public void run() { 
     // you'll probably want to immediately have stats available: 
     statistics.put(this, this.toString()); 

     //do stuff that affects pageNumber and dataBlocksParsed 
     // vv this will probably be inside your "do stuff" loop 
     statistics.put(this, this.toString()); 
     // loop end 

     // if you do not want stats of finished tasks, remove "this" here. 
    } 

    @Override 
    public String toString() { 
     return "GetDataTask{" + 
      "source=" + source + 
      ", dataType=" + dataType + 
      ", pageNumber=" + pageNumber + 
      ", dataBlocksParsed=" + dataBlocksParsed + 
      '}'; 
    } 
} 

и менеджер:

public class DataParseManager { 
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100, 20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300)); 

    private HashMap<GetDataTask,String> stats = new ConcurrentHashMap<GetDataTask,String>();  

    public void addParseDataTask(String source, String dataType) { 
     executor.execute(new GetDataTask(source, dataType, stats)); 
    } 

    // here's the method that I need 
    public String getInfo() { 
     StringBuilder info = new StringBuilder(); 
     //and here's the method that I'm missing - executor.getActiveThreads() 

     // >>> iterate "stats"'s values to build the info string ...    

     return info.append(executor.toString()).toString(); 
    } 
} 

UPDATE

Вы можете легко изменить этот подход, чтобы потянуть информацию, итерации keys карты (которые являются выполняемыми задачами) и вызвать на них toString. Однако это очень похоже на подход саки. Может быть, вы чувствуете себя более комфортно с ним.

0

Поскольку у вас есть контроль над используемым исполнителем, я бы использовать ThreadPoolExecutor «s beforeExecute и afterExecute метод для отслеживания запущенных задач и использовать, чтобы создать метод getActiveTasks.

import java.util.Set; 
import java.util.concurrent.*; 

public class ActiveTasksThreadPool extends ThreadPoolExecutor { 

    private final ConcurrentHashMap<Runnable, Boolean> activeTasks = new ConcurrentHashMap<>(); 

    public ActiveTasksThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { 
     super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 
    } 

    @Override 
    protected void beforeExecute(Thread t, Runnable r) { 

     activeTasks.put(r, Boolean.TRUE); 
     super.beforeExecute(t, r); 
    } 

    @Override 
    protected void afterExecute(Runnable r, Throwable t) { 

     super.afterExecute(r, t); 
     activeTasks.remove(r); 
    } 

    public Set<Runnable> getActiveTasks() { 
     // the returned set will not throw a ConcurrentModificationException. 
     return activeTasks.keySet(); 
    } 

    public static void main(String[] args) { 

     final int maxTasks = 5; 
     ActiveTasksThreadPool tp = new ActiveTasksThreadPool(maxTasks, maxTasks, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); 
     try { 
      System.out.println("Active tasks: " + tp.getActiveTasks()); 
      final CountDownLatch latch = new CountDownLatch(1); 
      for (int i = 0; i < maxTasks; i ++) { 
       final int rnumber = i; 
       tp.execute(new Runnable() { 
        @Override 
        public void run() { 
         try { latch.await(); } catch (Exception e) { 
          e.printStackTrace(); 
         } 
        } 
        @Override 
        public String toString() { 
         return "Runnable " + rnumber; 
        } 
       }); 
      } 
      Thread.sleep(100L); // give threads a chance to start 
      System.out.println("Active tasks: " + tp.getActiveTasks()); 
      latch.countDown(); 
      Thread.sleep(100L); // give threads a chance to finish 
      System.out.println("Active tasks: " + tp.getActiveTasks()); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      tp.shutdownNow(); 
     } 
    } 

} 

 Смежные вопросы

  • Нет связанных вопросов^_^