Я просто не понимаю, почему мой тайм-аут не работает. Все, что я хочу сделать, это просто подождать
в течение 10 секунд для некоторого потока, чтобы поместить сообщение в BlockedQueue<String>
и на тайм-аут вернуть какой-то ответ на клиент.Обработка тайм-аута ReadTimeoutHandler
public class NioAsynChatPipelineFactory implements ChannelPipelineFactory {
private static Timer timer = new HashedWheelTimer();
private final ChannelHandler timeoutHandler = new ReadTimeoutHandler(timer, 10);
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", new HttpRequestDecoder());
pipeline.addLast("encoder", new HttpResponseEncoder());
pipeline.addLast("handler", new NioAsynChatHandler());
pipeline.addLast("timeout", this.timeoutHandler);
return pipeline;
}
}
Теперь мой обработчик выглядит так.
public class NioAsynChatHandler extends SimpleChannelUpstreamHandler{
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
super.handleUpstream(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
System.out.println("Exception");
\\writing some kind of response and closing channel.
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Thread thread = new Thread(new ConsmerTask(e.getChannel()));
thread.start();
}
и внутри ConsumerTask Я просто жду BlockingQueue, чтобы получить ответ
public class ConsumerTask implements Runnable{
private Channel channel;
public ConsumerTask(Channel channel){
this.channel = channel;
}
@Override
public void run() {
try{
while(true){
String message = queue.take();
}
} catch(InterruptedException ex){
Thread.currentThread.interrupt();
} finally{
//write something to channel and close it
}
}
Моя проблема в том, что я не вижу, что любой excpetion происходит на тайм-аут. Что я делаю неправильно?
Update:
public static final BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();
На самом деле мой вопрос более общий характер, как закрыть канал на тайм-аут во время ожидания чего-то во внешнем потоке?
Update 2: Другой вопрос: из-за того, что я бегу внешний поток в Ча было бы лучше использовать OrderedMemoryAwareThreadPoolExecutor
в трубопроводе? Увеличит ли производительность.
Не следует ли использовать опрос() вместо того, чтобы принимать? Какую реализацию вы выбрали для BlockingQueue? –
На самом деле, неважно, пользуюсь ли я опросом() или take(). В опросе я могу указать тайм-аут и не требовать while (true) loop, а take() сразу получает элементы. Что касается очереди блокировки, см. Мое обновление. –