2013-08-07 2 views
1

У меня есть родительский поток, который отправляет сообщения в MQ и управляет ThreadPoolExecutor для рабочих потоков, которые прослушивают MQ и записывают сообщение в выходной файл. Я управляю threadpool размером 5. Поэтому, когда я запускаю свою программу, у меня есть 5 файлов с сообщениями. Все работает нормально, пока здесь. Теперь мне нужно объединить эти 5 файлов в родительский поток.Как мы знаем, что threadPoolExecutor завершил выполнение

Как узнать законченную обработку ThreadPoolExecutor, чтобы я мог начать слияние файлов.

public class ParentThread { 
    private MessageSender messageSender; 
    private MessageReciever messageReciever; 
    private Queue jmsQueue; 
    private Queue jmsReplyQueue; 
    ExecutorService exec = Executors.newFixedThreadPool(5); 

    public void sendMessages() { 
     System.out.println("Sending"); 
     File xmlFile = new File("c:/filename.txt"); 
     List<String> lines = null; 
     try { 
      lines = FileUtils.readLines(xmlFile, null); 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 
     for (String line : lines){ 
      messageSender.sendMessage(line, this.jmsQueue, this.jmsReplyQueue); 
     } 
     int count = 0; 
     while (count < 5) { 
      messageSender.sendMessage("STOP", this.jmsQueue, this.jmsReplyQueue); 
      count++; 
     } 

    } 

    public void listenMessages() { 
     long finishDate = new Date().getTime(); 
     for (int i = 0; i < 5; i++) { 
      Worker worker = new Worker(i, this.messageReciever, this.jmsReplyQueue); 
      exec.execute(worker); 
     } 
     exec.shutdown(); 

     if(exec.isTerminated()){ //PROBLEM is HERE. Control Never gets here. 
      long currenttime = new Date().getTime() - finishDate; 
      System.out.println("time taken: "+currenttime); 
      mergeFiles(); 
     } 
    } 
} 

Это мой рабочий класс

public class Worker implements Runnable { 

    private boolean stop = false; 
    private MessageReciever messageReciever; 
    private Queue jmsReplyQueue; 
    private int processId; 
    private int count = 0; 

    private String message; 
    private File outputFile; 
    private FileWriter outputFileWriter; 

    public Worker(int processId, MessageReciever messageReciever, 
      Queue jmsReplyQueue) { 
     this.processId = processId; 
     this.messageReciever = messageReciever; 
     this.jmsReplyQueue = jmsReplyQueue; 
    } 

    public void run() { 
     openOutputFile(); 
     listenMessages(); 
    } 


    private void listenMessages() { 
     while (!stop) { 
      String message = messageReciever.receiveMessage(null,this.jmsReplyQueue); 
      count++; 
      String s = "message: " + message + " Recieved by: " 
        + processId + " Total recieved: " + count; 
      System.out.println(s); 
      writeOutputFile(s); 
      if (StringUtils.isNotEmpty(message) && message.equals("STOP")) { 
       stop = true; 
      } 
     } 
    } 

    private void openOutputFile() { 
     try { 
      outputFile = new File("C:/mahi/Test", "file." + processId); 
      outputFileWriter = new FileWriter(outputFile); 
     } catch (IOException e) { 
      System.out.println("Exception while opening file"); 
      stop = true; 
     } 
    } 

    private void writeOutputFile(String message) { 
     try { 
      outputFileWriter.write(message); 
      outputFileWriter.flush(); 
     } catch (IOException e) { 
      System.out.println("Exception while writing to file"); 
      stop = true; 
     } 
    } 
} 

Как я буду знать, когда ThreadPool закончит обработку, так что я могу сделать мой другой очистить работу?

Благодаря

ответ

2

Что-то вроде этого:

exec.shutdown(); 

    // waiting for executors to finish their jobs 
    while (!exec.awaitTermination(50, TimeUnit.MILLISECONDS)); 

    // perform clean up work 
+1

Или удалите время и подождите несколько лет ... – assylias

+0

Спасибо SilverHaze. Именно то, что я искал. –

3

Если вы рабочий класс реализует отзывной вместо Runnable, то вы были бы в состоянии видеть, когда ваши темы полные с помощью будущего объекта, чтобы увидеть, если Thread вернул некоторый результат (например, boolean, который скажет вам, закончилось ли это выполнение или нет).

Посмотрите в разделе «8. Фьючерсы и вызываемых объектов» @ сайт ниже, он имеет именно то, что вам нужно имо: http://www.vogella.com/articles/JavaConcurrency/article.html

Edit: Так ведь все фьючерсы указывают, что их соответствующее исполнение отзывной является полным , его безопасно предположить, что ваш исполнитель завершил выполнение и может быть выключен/завершен вручную.

0

Вы можете использовать поток для мониторинга ThreadPoolExecutor как тот

import java.util.concurrent.ThreadPoolExecutor; 

public class MyMonitorThread implements Runnable { 
    private ThreadPoolExecutor executor; 

     private int seconds; 

     private boolean run=true; 

     public MyMonitorThread(ThreadPoolExecutor executor, int delay) 
     { 
      this.executor = executor; 
      this.seconds=delay; 
     } 

     public void shutdown(){ 
      this.run=false; 
     } 

     @Override 
     public void run() 
     { 
      while(run){ 
        System.out.println(
         String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s", 
          this.executor.getPoolSize(), 
          this.executor.getCorePoolSize(), 
          this.executor.getActiveCount(), 
          this.executor.getCompletedTaskCount(), 
          this.executor.getTaskCount(), 
          this.executor.isShutdown(), 
          this.executor.isTerminated())); 
        try { 
         Thread.sleep(seconds*1000); 
        } catch (InterruptedException e) { 
         e.printStackTrace(); 
        } 
      } 

     } 

} 

И добавить

MyMonitorThread monitor = new MyMonitorThread(executorPool, 3); 
      Thread monitorThread = new Thread(monitor); 
      monitorThread.start(); 

к классу, где ThreadPoolExecutor находится.

Он будет показывать ваши потоковые состояния в каждые 3 секунды.