0

Я создаю систему, которая будет иметь несколько развертываний набора, и каждое развертывание будет иметь очередь тестовых наборов. Поскольку я хочу, чтобы тестовые классы запускались одновременно при развертывании отдельных пакетов, мне нужно добавить параллелизм в код. Я создал упрощенную версию кода, который я использую, но часть параллелизма не работает, когда я пытаюсь закрыть его.Работа с блокировкойQueue и Multithreads. Все темы застряли в ожидании

Когда вызывается Runner.stopEverything(), результатом является то, что очередь получает опорожнение и ожидает завершения потоков, но даже когда все тесты завершены, ожидание никогда не заканчивается даже с notifyAll() , В результате процесс просто сидит там никогда не заканчивается. Я смотрю на него в режиме отладки, и результат состоит в том, что все 3 потока показывают ожидание.

Главная:

public static void main(String args[]) throws Exception { 
    Runner.queueTestSuites("SD1", Arrays.asList("A", "B", "C")); 
    Runner.queueTestSuites("SD2", Arrays.asList("D", "E", "F")); 
    Runner.queueTestSuites("SD3", Arrays.asList("G", "H", "I")); 

    Thread.sleep(5000); 

    System.out.println("~~~~~~~~~~~~~~~~~~~~~~~~"); 
    Runner.stopEverything(); 
} 

Runner:

public class Runner { 
    private static Map<String, TestQueue> runnerQueueMap = new ConcurrentHashMap<>(); 

    public synchronized static void queueTestSuites(String suiteDeployment, List<String> testSuiteQueueAsJSON) throws Exception { 
    TestQueue queue; 
    if(runnerQueueMap.containsKey(suiteDeployment)) { 
     queue = runnerQueueMap.get(suiteDeployment); 
    } else { 
     queue = new TestQueue(suiteDeployment); 
    } 
    for (int i = 0; i < testSuiteQueueAsJSON.size(); i++) { 
     String name = testSuiteQueueAsJSON.get(i); 
     queue.addToQueue(name); 
    } 
    runnerQueueMap.put(suiteDeployment,queue); 
    } 

    public synchronized static void stopEverything() throws InterruptedException { 
    for (String s : runnerQueueMap.keySet()) { 
     TestQueue q = runnerQueueMap.get(s); 
     q.saveAndClearQueue(); 
    } 

    for (String s : runnerQueueMap.keySet()) { 
     TestQueue q = runnerQueueMap.get(s); 
     q.waitForThread(); 
    } 

    System.out.println("All done at " + new Date()); 
    } 
} 

TestQueue:

public class TestQueue { 

    private Consumer consumer; 
    private Thread consumerThread; 
    private java.util.concurrent.BlockingQueue<String> queue; 
    private String suiteDeployment; 

    public TestQueue(String suiteDeployment) { 
    this.suiteDeployment = suiteDeployment; 
    queue = new ArrayBlockingQueue<>(100); 
    startConsumer(); 
    } 

    public void addToQueue(String testSuite) { 
    try { 
     queue.put(testSuite); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

    public synchronized void waitForThread() { 
    try { 
     if (consumer.running.get()) { 
     synchronized (consumerThread) { 
      System.out.println("Waiting for " + consumerThread.getName()); 
      consumerThread.wait(); 
     } 
     } 
     System.out.println("Thread complete at " + new Date()); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    } 

    public void saveAndClearQueue() { 
    List<String> suiteNames = new ArrayList<>(); 
    for (String suite : queue) { 
     suiteNames.add(suite); 
    } 
    queue.clear(); 
    } 

    private void startConsumer() { 
    consumer = new Consumer(queue,suiteDeployment); 
    consumerThread = new Thread(consumer); 
    consumerThread.start(); 
    } 

    private class Consumer implements Runnable{ 
    private BlockingQueue<String> queue; 
    private String suiteDeployment; 
    public AtomicBoolean running; 

    public Consumer(BlockingQueue<String> queue, String suiteDeployment){ 
     this.queue = queue; 
     this.suiteDeployment = suiteDeployment; 
     this.running = new AtomicBoolean(false); 
    } 

    @Override 
    public void run() { 
     try{ 
     while(!Thread.currentThread().isInterrupted()) { 
      String testSuite = queue.take(); 
      this.running.set(true); 
      new Test(testSuite, suiteDeployment).run(); 
      this.running.set(false); 
     } 
     notifyAll(); 
     }catch(Exception e) { 
     e.printStackTrace(); 
     } 
    } 
    } 
} 

Тест:

public class Test { 

    String testSuite = ""; 
    String suiteDeployment = ""; 

    public Test(String testSuite, String suiteDeployment) { 
    this.testSuite = testSuite; 
    this.suiteDeployment = suiteDeployment; 
    } 

    public void run() { 
    int time = new Random().nextInt() % 10000; 
    time = Math.max(time, 3000); 
    System.out.println("Test Started: " + testSuite + " on " + suiteDeployment + " at " + new Date() + " running for " + time + " on thread " + Thread.currentThread().getName()); 
    try { 
     Thread.sleep(time); 
    } catch (InterruptedException e) { 
     e.printStackTrace(); 
    } 
    System.out.println("Test Completed: " + testSuite + " on " + suiteDeployment + " at " + new Date()); 
    } 
} 
+0

'если (consumer.running.get()) { синхронизирована (consumerThread) { ' - то, что происходит, если 'consumer.running' становится ложным, и то тестовый поток вызывает 'notifyAll', когда основной поток находится внутри оператора' if' (но еще не вызвал 'wait')? – immibis

ответ

2

Внутри метода запуска вашего потребителя у вас есть блокирующий вызов queue.take(), что означает, что он будет блокироваться до тех пор, пока в вашей очереди не появится элемент. В конечном итоге у вас заканчиваются элементы внутри очереди, и весь ваш поток блокируется вызовом queue.take(), ожидая, что больше элементов станет доступным для обработки.

Хотя ваш вызов в цикле в то время, когда он проверить, если поток прерывается, вы на самом деле никогда не прерывать нити, так что никогда не получает оценки петлевых в то время как & заблокированных в вызове queue.take()

так что ваши темы остаться в засаде, как они ожидают ввода, чтобы стать строения внутри очереди блокировки

Кроме того, ваш saveAndClear метод должен зафиксировать на нужный объект, который сам по себе очереди, как показано ниже:

public void saveAndClearQueue() { 
    List<String> suiteNames = new ArrayList<String>(); 
    synchronized (queue) { 
     for (String suite : queue) { 
      suiteNames.add(suite); 
     } 
     queue.clear(); 
    } 
    System.out.println("Saved(not executed) : "+suiteNames); 
} 

И ваш метод waitForThread должен делать что-н, как показано ниже:

public void waitForThread() { 
    synchronized (consumerThread) { 
     while (consumer.running.get()) { 
      try { 
       consumerThread.wait(100); 
      } catch (InterruptedException e) { 
       break; 
      } 
     } 
    } 

    if (!consumer.running.get()) { 
     consumerThread.interrupt(); 
    } 

    System.out.println("Thread complete at " + new Date()); 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^