2016-11-10 9 views
0

У меня есть тест, который блокирует (первый блок кода). У меня есть несколько элементов, которые работают вместе. У меня есть блокирующая очередь, в которую я помещаю события, тогда у меня есть потребитель, который их снимает и отправляет в Amazon Kinesis. Я уверен, что мой тест блокируется, потому что очередь блокирует моего пользователя, хотя я думал, что он работает в отдельном потоке.Основная тема зависает из-за потребителя BlockingQueue (который, как я думал, был в другом потоке)

// Test.java 
@Test 
public void testWhileLoop() throws InterruptedException { 
    ArrayBlockingQueue<Event> testQ = new ArrayBlockingQueue<Event>(1024); 
    // mockKinesis is a mock at the class level. 
    KPLPoster kpl = new KPLPoster("TestStream", mockKinesis, testQ); 
    Event event = new Event("TestMessage", "TestPartition"); 
    ListenableFuture<UserRecordResult> fakeReturn = Mockito.mock(ListenableFuture.class); 

    final AtomicInteger numberOfWhileLoops = new AtomicInteger(); 

    Mockito.doAnswer(invocation -> { 
     numberOfWhileLoops.incrementAndGet(); 
     return fakeReturn; 
    }) 
    .when(mockKinesis) 
    .addUserRecord("TestStream", "TestPartition", ByteBuffer.wrap("TestMessage".getBytes())); 

    kpl.run(); // Hangs here 

    for(int i = 100; i > 0; i--){ 
     testQ.put(event); 
    } 

    kpl.stop(); 
    kpl = null; 

    assert(numberOfWhileLoops.toString()).equals("100"); 
} 

Вот метод запуска BaseKinesisPoster, который наследует мой KPLPoster. Следует отметить, что BaseKinesisPoster реализует интерфейс Runnable.

//BaseKinesisPoster.java 
@Override 
public void run() { 
    shutdown = false; 
    while (!shutdown && !(Thread.currentThread().isInterrupted())) { 
     try { 
      this.runOnce(); 
     } catch (InterruptedException e) { 
      Thread.currentThread().interrupt(); 
     }catch (Exception e){ 
      e.printStackTrace(); 
     } 
    } 
} 

Наконец, здесь является частью соответствующего runOnce() метода мой KPLPoster (который расширяет BaseKinesisPoster).

// KPLPoster.java 
@Override 
protected void runOnce() throws Exception { 
    Event event = inputQueue.take(); 
    //other stuff in my method 
} 

Как я могу убедиться, что блокирование моего потребителя очереди не блокирует мой тестовый/основной поток?

+0

Я удалил свой ответ, поскольку я пропустил некоторые ключевые детали. – Taylor

ответ

4

Когда вы звоните

Thread.run(); 

он вызывает метод называется. Ничего особенного не происходит, и метод запускается в текущем потоке.

Когда вы звоните

Thread.start(); 

Это запускает поток, который, в свою очередь, вызывает запуск() в этом новом потоке.

BTW Thread.stop() будет вызывать UnsupportedOperationException на Java 8. Его не следует использовать. Вы должны позволить ему закончить естественным путем.

+0

Я не использую 'Thread.stop()' в потоке, его реализацию я сделал сам. Он в основном превращает 'shutdown = true', поэтому он выпрыгивает из цикла while. Спасибо за указатель. – bwighthunter

+0

Я настоятельно рекомендую не переопределять стандартные методы, ни стандартные классы, как Thread. –

+0

Я не переопределяю. Метод stop относится к классу внутри потока, а не к самой нити. Это прекрасно, так? Если нет, почему это было бы плохо? Я полностью понимаю, что не перекрываю метод остановки Thread. – bwighthunter