У меня есть тест, который блокирует (первый блок кода). У меня есть несколько элементов, которые работают вместе. У меня есть блокирующая очередь, в которую я помещаю события, тогда у меня есть потребитель, который их снимает и отправляет в Amazon Kinesis. Я уверен, что мой тест блокируется, потому что очередь блокирует моего пользователя, хотя я думал, что он работает в отдельном потоке.Основная тема зависает из-за потребителя BlockingQueue (который, как я думал, был в другом потоке)
// Test.java
@Test
public void testWhileLoop() throws InterruptedException {
ArrayBlockingQueue<Event> testQ = new ArrayBlockingQueue<Event>(1024);
// mockKinesis is a mock at the class level.
KPLPoster kpl = new KPLPoster("TestStream", mockKinesis, testQ);
Event event = new Event("TestMessage", "TestPartition");
ListenableFuture<UserRecordResult> fakeReturn = Mockito.mock(ListenableFuture.class);
final AtomicInteger numberOfWhileLoops = new AtomicInteger();
Mockito.doAnswer(invocation -> {
numberOfWhileLoops.incrementAndGet();
return fakeReturn;
})
.when(mockKinesis)
.addUserRecord("TestStream", "TestPartition", ByteBuffer.wrap("TestMessage".getBytes()));
kpl.run(); // Hangs here
for(int i = 100; i > 0; i--){
testQ.put(event);
}
kpl.stop();
kpl = null;
assert(numberOfWhileLoops.toString()).equals("100");
}
Вот метод запуска BaseKinesisPoster, который наследует мой KPLPoster. Следует отметить, что BaseKinesisPoster реализует интерфейс Runnable.
//BaseKinesisPoster.java
@Override
public void run() {
shutdown = false;
while (!shutdown && !(Thread.currentThread().isInterrupted())) {
try {
this.runOnce();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}catch (Exception e){
e.printStackTrace();
}
}
}
Наконец, здесь является частью соответствующего runOnce()
метода мой KPLPoster (который расширяет BaseKinesisPoster).
// KPLPoster.java
@Override
protected void runOnce() throws Exception {
Event event = inputQueue.take();
//other stuff in my method
}
Как я могу убедиться, что блокирование моего потребителя очереди не блокирует мой тестовый/основной поток?
Я удалил свой ответ, поскольку я пропустил некоторые ключевые детали. – Taylor