2016-02-10 8 views
1

У меня есть веб-приложение JBoss, которое в настоящее время использует встроенный HornetQ для JMS. Мы хотим переключиться на кластер ActiveMQ HA, но у меня возникают некоторые странные проблемы. Одна из моих очередей (periodDerivationQueue) не ведет себя так же, как с HornetQ. Консоль AMQ показывает, что сообщения находятся в очереди и удалены, но они не доходят до моего потребителя. Сначала я предположил, что сообщения по какой-то причине дежурили в DLQ, но это, похоже, не так. Насколько я понимаю, AMQ не будет создавать DLQ, если это необходимо. Когда я смотрю на брокера, нет DLQ. Как я могу понять, куда идут мои сообщения?Сообщения ActiveMQ dequeud, но не используются

У меня также есть проблемы с отладкой со стороны приложения из-за отражения. Я бы хотел установить точку останова на стороне AMQ, чтобы узнать, что происходит с моими сообщениями, но я не знаю, где ее разместить. Есть идеи?

Может ли это быть проблемой сериализации? Я слышал, что иногда различия в сериализации между брокерами JMS могут привести к странному поведению.

Я действительно застрял здесь, и любая помощь будет оценена по достоинству. См. Информацию о конфигурации ниже.

Wildfly 8.2

AMQ 5.13

Потребительские (сообщения не делает его здесь)

public class PeriodicDerivationExecutionHandlerImpl implements PeriodicDerivationExecutionHandler { 

protected DerivationService derivationService; 
protected DerivationModelService derivationModelService; 

protected Logger logger = LoggerFactory.getLogger(this.getClass()); 

@Override 
public void executeDerivation(PeriodicDerivation params) throws Exception{ 

    JbpmHibernateUtil.openSession(); 
    Derivation derivation = null; 
    try{    

     if (params.isGroup()){ 
      derivation = new GroupDerivation(); 

      GroupQueryParameters qp = new GroupQueryParameters(); 
      qp.setGroupName(params.getItemName()); 
      derivation.setDerivedItem(derivationModelService.getGroup(qp)); 

     }else{ 
      derivation = new DeterminantDerivation(); 

      DeterminantQueryParameters qp = new DeterminantQueryParameters(); 
      qp.setDeterminantName(params.getItemName());    
      derivation.setDerivedItem(derivationModelService.getDeterminant(qp)); 

     } 

     logger.info("Executing periodic derivation [" + derivation + "]"); 

     derivation.setModelEffectiveDate(new DateTime()); 
     derivation.setPeriod(params.getPeriod()); 
     derivation.getProcessParameters().add(new DerivationProcessParameter(PeriodicDerivation.PERIODIC_PROCESS_VAR, true)); 
     derivation.setExecutionMode(DerivationExecutionMode.SYNCHRONOUS_LOCAL); 
     derivationService.executeDerivation(derivation); 

     JbpmHibernateUtil.closeSession(true); 
    }catch(Exception e){ 
     logger.error("Periodic derivation execution failed for [" + derivation + "]",e); 
     JbpmHibernateUtil.closeSession(false); 
     throw new Exception("Periodic derivation execution failed for [" + derivation + "]",e); 
    } 
} 

public DerivationService getDerivationService() { 
    return derivationService; 
} 

public void setDerivationService(DerivationService derivationService) { 
    this.derivationService = derivationService; 
} 

public DerivationModelService getDerivationModelService() { 
    return derivationModelService; 
} 

public void setDerivationModelService(DerivationModelService derivationModelService) { 
    this.derivationModelService = derivationModelService; 
} 

}

Потребительское XML конфигурации

<int:gateway id="periodicDerivationExecutionGateway" 
     service-interface="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandler"> 
     <int:method name="executeDerivation" request-channel="periodicDerivationChannel" /> 
    </int:gateway> 

    <bean id="periodicDerivationExecutor" 
     class="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandlerImpl"> 
     <property name="derivationService" ref="derivationService" /> 
     <property name="derivationModelService" ref="derivationModelService" /> 
    </bean> 

    <int:service-activator input-channel="periodicDerivationChannel" 
     ref="periodicDerivationExecutor" method="executeDerivation" /> 

    <int-jms:channel id="periodicDerivationChannel" 
     queue-name="${jms.destination.name.periodicderivation}" concurrency="${integration.listener.threads.maximum}" 
     task-executor="periodicDerivationTaskExecutor" /> 

ActiveMQ Standalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0"> 
     <resource-adapters> 
      <resource-adapter id="activemq-rar.rar"> 
       <archive> 
        activemq-rar.rar 
       </archive> 
       <transaction-support>XATransaction</transaction-support> 
       <config-property name="ServerUrl"> 
        tcp://127.0.0.1:61616?jms.rmIdFromConnectionId=true 
       </config-property> 
       <config-property name="UserName"> 
        admin 
       </config-property> 
       <config-property name="Password"> 
        admin 
       </config-property> 
       <connection-definitions> 

        <connection-definition 
         class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory" 
         jndi-name="java:/ConnectionFactory" 
      enabled="true" 
      pool-name="ConnectionFactory"> 
         <xa-pool> 
          <min-pool-size>1</min-pool-size> 
          <max-pool-size>20</max-pool-size> 
          <prefill>false</prefill> 
          <is-same-rm-override>false</is-same-rm-override> 
         </xa-pool> 
         <recovery> 
          <recover-credential> 
           <user-name>admin</user-name> 
           <password>admin</password> 
          </recover-credential> 
          <recover-plugin class-name="org.jboss.jca.core.recovery.ConfigurableRecoveryPlugin"> 
           <config-property name="EnableIsValid"> 
            false 
           </config-property> 
           <config-property name="IsValidOverride"> 
            true 
           </config-property> 
           <config-property name="EnableClose"> 
            true 
           </config-property> 
          </recover-plugin> 
         </recovery> 
        </connection-definition> 

       </connection-definitions> 

Очереди/темы

<admin-objects> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
           jndi-name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue" 
           use-java-context="true" 
           pool-name="deferredBpmCommandQueue"> 
         <config-property name="PhysicalName"> 
          deferredBpmCommandQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
           jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue" 
           use-java-context="true" 
           pool-name="ActiveMQQueue.asyncActionRequestQueue"> 
         <config-property name="PhysicalName"> 
          asyncActionRequestQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" 
           jndi-name="java:jboss/exported/jms/queue/bpm/DLQ" 
           use-java-context="true" 
           pool-name="DLQ"> 
         <config-property name="PhysicalName"> 
          DLQ 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue" use-java-context="true" pool-name="ActiveMQQueue.cacheUpdateReplicationQueue"> 
         <config-property name="PhysicalName"> 
          cacheUpdateReplicationQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue" use-java-context="true" pool-name="ActiveMQQueue.periodicDerivationQueue"> 
         <config-property name="PhysicalName"> 
          periodicDerivationQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncServiceSignalQueue"> 
         <config-property name="PhysicalName"> 
          asyncServiceSignalQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/processEventTopic" use-java-context="true" pool-name="ActiveMQTopic.processEventTopic"> 
         <config-property name="PhysicalName"> 
          processEventTopic 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionReplyQueue"> 
         <config-property name="PhysicalName"> 
          asyncActionReplyQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/ExpiryQueue" use-java-context="true" pool-name="ActiveMQQueue.ExpiryQueue"> 
         <config-property name="PhysicalName"> 
          ExpiryQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusRequestTopic"> 
         <config-property name="PhysicalName"> 
          asyncActionServiceStatusRequestTopic 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityRequestTopic"> 
         <config-property name="PhysicalName"> 
          asyncActionAffinityRequestTopic 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue" use-java-context="true" pool-name="ActiveMQQueue.jbpmJobQueue"> 
         <config-property name="PhysicalName"> 
          jbpmJobQueue 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityReplyTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityReplyTopic"> 
         <config-property name="PhysicalName"> 
          asyncActionAffinityReplyTopic 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationEventTopic" use-java-context="true" pool-name="ActiveMQTopic.cacheUpdateReplicationEventTopic"> 
         <config-property name="PhysicalName"> 
          cacheUpdateReplicationEventTopic 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusTopic"> 
         <config-property name="PhysicalName"> 
          asyncActionServiceStatusTopic 
         </config-property> 
        </admin-object> 
        <admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionServiceLogRecordQueue"> 
         <config-property name="PhysicalName"> 
          asyncActionServiceLogRecordQueue 
         </config-property> 
        </admin-object> 
       </admin-objects> 

Брокер конфигурации

<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> 
    <property name="locations"> 
     <value>file:${activemq.conf}/credentials.properties</value> 
    </property> 
</bean> 


<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery" 
     lazy-init="false" scope="singleton" 
     init-method="start" destroy-method="stop"> 
</bean> 

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}" persistent="true"> 

    <destinationPolicy> 
     <policyMap> 
      <policyEntries> 
      <policyEntry topic=">" > 

       <pendingMessageLimitStrategy> 
       <constantPendingMessageLimitStrategy limit="1000"/> 
       </pendingMessageLimitStrategy> 
      </policyEntry> 
      </policyEntries> 
     </policyMap> 
    </destinationPolicy> 


    <managementContext> 
     <managementContext createConnector="false"/> 
    </managementContext> 


    <persistenceAdapter> 
     <kahaDB directory="${activemq.data}/kahadb"/> 
    </persistenceAdapter> 

     <systemUsage> 
     <systemUsage> 
      <memoryUsage> 
       <memoryUsage percentOfJvmHeap="70" /> 
      </memoryUsage> 
      <storeUsage> 
       <storeUsage limit="100 gb"/> 
      </storeUsage> 
      <tempUsage> 
       <tempUsage limit="50 gb"/> 
      </tempUsage> 
     </systemUsage> 
    </systemUsage> 


    <transportConnectors> 
     <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/> 
    </transportConnectors> 

    <shutdownHooks> 
     <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> 
    </shutdownHooks> 

</broker> 
<import resource="jetty.xml"/> 

HornetQ Standalone.xml (Jboss)

<subsystem xmlns="urn:jboss:domain:messaging:2.0"> 
     <hornetq-server> 
      <persistence-enabled>false</persistence-enabled> 
      <jmx-management-enabled>true</jmx-management-enabled> 
      <shared-store>true</shared-store> 
      <journal-type>ASYNCIO</journal-type> 
      <journal-file-size>102400</journal-file-size> 
      <journal-min-files>2</journal-min-files> 

      <connectors> 
       <netty-connector name="netty" socket-binding="messaging"/> 
       <netty-connector name="netty-throughput" socket-binding="messaging-throughput"> 
        <param key="batch-delay" value="50"/> 
       </netty-connector> 
       <in-vm-connector name="in-vm" server-id="0"/> 
      </connectors> 

      <acceptors> 
       <netty-acceptor name="netty" socket-binding="messaging"/> 
       <netty-acceptor name="netty-throughput" socket-binding="messaging-throughput"> 
        <param key="batch-delay" value="50"/> 
        <param key="direct-deliver" value="false"/> 
       </netty-acceptor> 
       <in-vm-acceptor name="in-vm" server-id="0"/> 
      </acceptors> 

      <security-settings> 
       <security-setting match="#"> 
        <permission type="send" roles="guest"/> 
        <permission type="consume" roles="guest"/> 
        <permission type="createNonDurableQueue" roles="guest"/> 
        <permission type="deleteNonDurableQueue" roles="guest"/> 
       </security-setting> 
      </security-settings> 

      <address-settings> 
       <address-setting match="#"> 
        <dead-letter-address>jms.queue.DLQ</dead-letter-address> 
        <expiry-address>jms.queue.ExpiryQueue</expiry-address> 
        <redelivery-delay>0</redelivery-delay> 
        <max-size-bytes>104857600</max-size-bytes> 
        <page-size-bytes>10485760</page-size-bytes> 
        <page-max-cache-size>10</page-max-cache-size> 
        <address-full-policy>PAGE</address-full-policy> 
        <message-counter-history-day-limit>10</message-counter-history-day-limit> 
       </address-setting> 
      </address-settings> 

      <jms-connection-factories> 
       <connection-factory name="InVmConnectionFactory"> 
        <connectors> 
         <connector-ref connector-name="in-vm"/> 
        </connectors> 
        <entries> 
         <entry name="java:/ConnectionFactory"/> 
        </entries> 
       </connection-factory> 
       <connection-factory name="RemoteConnectionFactory"> 
        <connectors> 
         <connector-ref connector-name="netty"/> 
        </connectors> 
        <entries> 
         <entry name="java:jboss/exported/jms/RemoteConnectionFactory"/> 
        </entries> 
        <client-failure-check-period>30000</client-failure-check-period> 
        <connection-ttl>300000</connection-ttl> 
        <retry-interval>2000</retry-interval> 
        <retry-interval-multiplier>1</retry-interval-multiplier> 
        <max-retry-interval>2000</max-retry-interval> 
        <reconnect-attempts>100</reconnect-attempts> 
       </connection-factory> 
      </jms-connection-factories> 

Очереди/Темы

<jms-destinations> 
       <jms-queue name="asyncActionRequestQueue"> 
        <entry name="queue/bpm/asyncActionRequestQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"/> 
       </jms-queue> 
       <jms-queue name="asyncActionReplyQueue"> 
        <entry name="queue/bpm/asyncActionReplyQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue"/> 
       </jms-queue> 
       <jms-queue name="asyncServiceSignalQueue"> 
        <entry name="queue/bpm/asyncServiceSignalQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue"/> 
       </jms-queue> 
       <jms-queue name="asyncActionServiceLogRecordQueue"> 
        <entry name="queue/bpm/asyncActionServiceLogRecordQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue"/> 
       </jms-queue> 
       <jms-queue name="deferredBpmCommandQueue"> 
        <entry name="queue/bpm/deferredBpmCommandQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"/> 
       </jms-queue> 
       <jms-queue name="jbpmJobQueue"> 
        <entry name="queue/bpm/jbpmJobQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue"/> 
       </jms-queue> 
       <jms-queue name="DLQ"> 
        <entry name="queue/DLQ"/> 
        <entry name="java:jboss/exported/jms/queue/DLQ"/> 
       </jms-queue> 
       <jms-queue name="ExpiryQueue"> 
        <entry name="queue/ExpiryQueue"/> 
        <entry name="java:jboss/exported/jms/queue/ExpiryQueue"/> 
       </jms-queue> 
       <jms-queue name="periodicDerivationQueue"> 
        <entry name="queue/bpm/periodicDerivationQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue"/> 
       </jms-queue> 
       <jms-queue name="cacheUpdateReplicationQueue"> 
        <entry name="queue/bpm/cacheUpdateReplicationQueue"/> 
        <entry name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue"/> 
       </jms-queue> 
       <jms-topic name="asyncActionServiceStatusTopic"> 
        <entry name="topic/bpm/asyncActionServiceStatusTopic"/> 
        <entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusTopic"/> 
       </jms-topic> 
       <jms-topic name="asyncActionServiceStatusRequestTopic"> 
        <entry name="topic/bpm/asyncActionServiceStatusRequestTopic"/> 
        <entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusRequestTopic"/> 
       </jms-topic> 
       <jms-topic name="asyncActionAffinityRequestTopic"> 
        <entry name="topic/bpm/asyncActionAffinityRequestTopic"/> 
        <entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityRequestTopic"/> 
       </jms-topic> 
       <jms-topic name="asyncActionAffinityReplyTopic"> 
        <entry name="topic/bpm/asyncActionAffinityReplyTopic"/> 
        <entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityReplyTopic"/> 
       </jms-topic> 
       <jms-topic name="processEventTopic"> 
        <entry name="topic/bpm/processEventTopic"/> 
        <entry name="java:jboss/exported/jms/topic/bpm/processEventTopic"/> 
       </jms-topic> 
       <jms-topic name="cacheUpdateReplicationEventTopic"> 
        <entry name="topic/bpm/cacheUpdateReplicationEventTopic"/> 
        <entry name="java:jboss/exported/jms/topic/bpm/cacheUpdateReplicationEventTopic"/> 
       </jms-topic> 
      </jms-destinations> 
+0

Пожалуйста, добавьте конфигурацию broker.xml для activemq вместе с информацией о конфигурации пользователя – Ashoka

+0

Спасибо. Я добавил конфигурацию брокера, и я скоро добавлю потребительскую конфигурацию. – user3029642

+0

Добавлена ​​конфигурация потребителя – user3029642

ответ

2

ObjectMessage безопасности сериализации была проблема.

Объекты ObjectMessage зависят от Java-сериализации служебной нагрузки маршала/немаршала. Этот процесс обычно считается небезопасным, поскольку вредоносная полезная нагрузка может использовать хост-систему. Поэтому, начиная с версий 5.12.2 и 5.13.0, ActiveMQ принуждает пользователей к явно белым спискам, которые можно обменять с помощью ObjectMessages.

Я видел это несколько дней назад и добавил белый список, но это не решило проблему. Я также пытался работать против AMQ 5.11.3, и это не сработало. По-видимому, они добавили функцию безопасности в 5.11.3. Во всяком случае, я добавил это (-Dorg.apache.activemq.SERIALIZABLE_PACKAGES = "*") на стороне клиента и аргументы AMQ vm, и теперь все работает так, как должно.

Имейте в виду, что параметр командной строки, который я использовал, является уязвимостью безопасности , которую я открыто открыл в своем брокере, который может разрешить злоумышленнику выполнять код в моей системе. Правильный путь к использует этот флаг для явного перечня классов, которые вы разрешаете , десериализованных или наиболее подходящих для использования групповых шаблонов пакетов, чтобы избежать явного перечисления отдельных классов и подпакетов в доверенном родительском пакете.