2014-08-29 5 views
1

Попытка иметь JMS MessageConsumer выжить при перезагрузке ActiveMQ, поэтому он может повторно подключаться с использованием протокола переадресации транспорта.JMS MessageConsumer Использование MessageListener завершается при завершении ActiveMQ

Однако он заканчивается при выключении ActiveMQ.

Это похоже на ошибку, что было сообщено и «решен», но я все еще видел это в последней версии ActiveMQ 5.10.0

Я использовал следующий Maven зависимость

<dependency> 
     <groupId>org.apache.activemq</groupId> 
     <artifactId>activemq-all</artifactId> 
     <version>5.10.0</version> 
    </dependency> 

Ниже приведен пример кода с использованием

public class SimpleConsumer { 

public static void main(String[] args) throws Exception { 
    String url = "failover:(tcp://ACTIVE_MQ_HOST:61616)"; 
    String destination = "test-topic"; 

    TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
      url); 

    ActiveMQConnection connection = (ActiveMQConnection) connectionFactory 
      .createConnection(); 

    Session session = connection.createSession(false, 
      Session.AUTO_ACKNOWLEDGE); 

    Topic topic = session.createTopic(destination); 

    MessageConsumer consumer = session.createConsumer(topic); 
    connection.start(); 

// Uncomment these lines and comment out the lines below and it will work 
//  while (true) { 
//   Message msg = consumer.receive(); 
//   if (msg instanceof TextMessage) { 
//    System.out.println("msg received = " + msg); 
//   } 
//  } 

    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message msg) { 
      System.out.println("msg received = " + msg); 
     } 

    }); 

} 

}

Я бы как и для работы с MessageListener, он не является блокирующим и асинхронным.

Любая помощь с этим очень ценится.

Что-то, что я уже пробовал, как было предложено JIRA, приведенным выше, заключается в том, чтобы запустить это в потоке, отличном от daemon, но это не сработало.

Я попробовал этот

public class SimpleConsumerThread { 

    public static void main(String[] args) throws Exception { 


     Thread t = new Thread() { 
      public void run() { 
       try {    
        String url = "failover:(tcp://ACTIVEMQ_HOST:61616)"; 
        String destination = "test-topic"; 

        TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 

        ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); 

        Session session = connection.createSession(false, 
        Session.AUTO_ACKNOWLEDGE); 

        Topic topic = session.createTopic(destination); 

        MessageConsumer consumer = session.createConsumer(topic); 
        connection.start(); 
        consumer.setMessageListener(new MessageListener() { 

         public void onMessage(Message msg) { 
          System.out.println("msg received = " + msg); 
         } 

        }); 
       } catch (JMSException e) { 
        e.printStackTrace(); 
       }    
      } 
     }; 
     t.setDaemon(false); 

     t.start(); 

    } 

} 

ответ

1

Причина многопоточного решение не работает происходит потому, что поток завершается после запуска() метод завершается, и тогда вы не имеете не-демон нить работает, как и раньше , Это не очень хорошая идея, чтобы зависеть от внутренней модели потоковой обработки сторонней библиотеки, чтобы ваше приложение работало.

Лучшее решение, которое будет работать независимо от ошибок или других тонкостей конфигурации в клиенте ActiveMQ, заключается в использовании парадигмы while (true) sleep(), чтобы сохранить ваш основной поток в живых.

1

Thanks Tim,

Да, это сработало. Я просто добавил, чтобы сохранить хотя бы один поток пользователей, чтобы программа не прерывалась.

while(true) { 
     Thread.sleep(1000); 
    } 

веселит,

public class SimpleConsumer { 

    static Logger logger = Logger.getLogger(SimpleConsumer.class); 

    public static void main(String[] args) throws Exception { 
     String url = "failover:(tcp://sydapp057lx.fxdms.net:61615)"; 
     String destination = "test-topic"; 

     TopicConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
       url); 

     ActiveMQConnection connection = (ActiveMQConnection) connectionFactory 
       .createConnection(); 

     connection.setExceptionListener(new ExceptionListener() { 
      public void onException(JMSException e) { 
       logger.debug("got exception = " + e); 
      } 
     }); 

     Session session = connection.createSession(false, 
       Session.AUTO_ACKNOWLEDGE); 

     Topic topic = session.createTopic(destination); 

     MessageConsumer consumer = session.createConsumer(topic); 
     connection.start(); 

     consumer.setMessageListener(new MessageListener() { 

      public void onMessage(Message msg) { 
       logger.debug("msg received = " + msg); 
      } 

     }); 

     while(true) { 
      Thread.sleep(1000); 
     } 

    } 


}