2015-11-27 8 views
0

I Интеграция системы обмена сообщениями Blazeds с ActiveMQ:.Разъединение адаптера BlazeDS JMS

Я настраиваю BlazeDS для создания надежного назначения flex с использованием flex.messaging.services.messaging.adapters.JMSAdapter (конфигурация ниже).

Мне удалось создать небольшое приложение, которое регистрируется на эту тему и получает сообщение. Поскольку мне нужно получить сообщение, отправленное, когда я был в автономном режиме, я создаю прочный запрос.

Все работает нормально, пока я правильно отписываю потребитель гибкого диска.

Моя проблема заключается в том, что потребитель гибкого диска не требует отписки. Например, когда I закройте браузер.

В этой ситуации тема по-прежнему активна (я могу видеть ее с веб-консоли ActiveMQ), и она потребляет сообщения.

Когда я снова подключаюсь к новому экземпляру приложения flex, соединение в порядке, но я не получил никакого сообщения. И те, которые были отправлены, я не был в отъезде, ни новый. Я даже не могу удалить тему с помощью веб-консоли ActiveMQ: javax.jms.JMSException: Durable consumer is in use.

Единственное решение для удаления темы - перезапустить веб-приложение, связанное с брокером BlazeDS.

Может ли кто-нибудь дать мне другое решение?

это моя конфигурация BlazeDS

<adapters> 
    <adapter-definition id="actionscript" class="flex.messaging.services.messaging.adapters.ActionScriptAdapter" default="true" /> 
    <adapter-definition id="jms" class="flex.messaging.services.messaging.adapters.JMSAdapter"/> 
</adapters> 

... 

    <destination id="warehouse-topic-jms"> 
    <properties> 
     <jms> 
      <destination-type>Topic</destination-type> 
      <message-type>javax.jms.ObjectMessage</message-type> 
      <connection-factory>java:comp/env/jms/flex/TopicConnectionFactory</connection-factory> 
      <destination-jndi-name>java:comp/env/jms/warehouse</destination-jndi-name> 
      <delivery-mode>PERSISTENT</delivery-mode> 
      <message-priority>DEFAULT_PRIORITY</message-priority> 
      <acknowledge-mode>AUTO_ACKNOWLEDGE</acknowledge-mode> 
      <initial-context-environment> 
       <property> 
        <name>Context.INITIAL_CONTEXT_FACTORY</name> 
        <value>org.apache.activemq.jndi.ActiveMQInitialContextFactory</value> 
       </property> 
       <property> 
        <name>Context.PROVIDER_URL</name> 
        <value>tcp://localhost:61616</value> 
       </property> 
      </initial-context-environment> 
     </jms> 
     <server> 
      <durable>true</durable>    
     </server> 
    </properties> 

    <adapter ref="jms"/> 
</destination> 

это контекст Tomcat, чтобы выставить JNDI ресурсы

<Resource name="jms/flex/TopicConnectionFactory" 
    type="org.apache.activemq.ActiveMQConnectionFactory" 
    description="JMS Connection Factory" 
    factory="org.apache.activemq.jndi.JNDIReferenceFactory" 
    brokerURL="tcp://localhost:61616" 
    brokerName="myBroker"/> 
<Resource name="jms/warehouse" 
    type="org.apache.activemq.command.ActiveMQTopic" 
    description="warehouse.topic" 
    factory="org.apache.activemq.jndi.JNDIReferenceFactory" 
    physicalName="warehouse.topic"/> 

и это мой прогибается потребитель

<mx:Consumer id="consumer" 
       channelConnect="consumer_channelConnectHandler(event)" 
       channelFault="consumer_channelFaultHandler(event)" destination="warehouse-topic-jms" 
       fault="consumer_faultHandler(event)" message="consumer_messageHandler(event)"/> 

ответ

0

Единственное решение, которое я найденный, должен использовать JMX MBEAN, выставленный BlazeDS, чтобы заставить JMSAdapter удалить ActiveMQ T OPIC. Таким образом, я могу воссоздать новую тему с тем же идентификатором клиента и получить сообщения.

Вот код метода Java я реализовал

public boolean deleteTopicQueue(String clientId) throws Exception { 
     if(clientId==null){ 
      throw new Exception("Error removing topic: null name provided"); 
     } 
     clientId=clientId.trim(); 
     JMXServiceURL url = new JMXServiceURL(DEFAULT_JMX_ADAPTER_URL); 
     try (JMXConnector jmxc = JMXConnectorFactory.connect(url, null)){ 
      MBeanServerConnection mbsc = jmxc.getMBeanServerConnection(); 
      ObjectName pattern = new ObjectName(DEFAULT_JMSADAPTER_MEAN_QUERY); 
      Set<ObjectName> names = 
       new TreeSet<ObjectName>(mbsc.queryNames(pattern, null)); 

      if(names.size()<=0){ 
       logger.info("Error Removig topic "+clientId+": No JMSAdapter found "); 
       throw new Exception("Error Removig topic "+clientId+": No JMSAdapter found "); 
      } 

      while(names.iterator().hasNext()){ 
       ObjectName ob = names.iterator().next(); 
       JMSAdapterControlMBean obProxy = JMX.newMXBeanProxy(mbsc, ob,JMSAdapterControlMBean.class); 
       String[] consumerList= obProxy.getTopicConsumerIds(); 
       for(String consumer :consumerList){ 
        if(consumer.trim().equals(clientId)){ 
         logger.info("Removing "+consumer+" from "+ob.getCanonicalName()); 
         obProxy.removeConsumer(consumer); 
         return true; 
        } 
       } 
      } 
      logger.debug("No consumer with ID "+clientId+" Found"); 
      return false; 
     } catch (Exception e) { 
      logger.info("Error Removig topic "+clientId+": No JMSAdapter found "); 
      throw new Exception("Error Removig topic "+clientId+" :"+e.getMessage()); 
     } 
    }