У меня есть веб-приложение JBoss, которое в настоящее время использует встроенный HornetQ для JMS. Мы хотим переключиться на кластер ActiveMQ HA, но у меня возникают некоторые странные проблемы. Одна из моих очередей (periodDerivationQueue) не ведет себя так же, как с HornetQ. Консоль AMQ показывает, что сообщения находятся в очереди и удалены, но они не доходят до моего потребителя. Сначала я предположил, что сообщения по какой-то причине дежурили в DLQ, но это, похоже, не так. Насколько я понимаю, AMQ не будет создавать DLQ, если это необходимо. Когда я смотрю на брокера, нет DLQ. Как я могу понять, куда идут мои сообщения?Сообщения ActiveMQ dequeud, но не используются
У меня также есть проблемы с отладкой со стороны приложения из-за отражения. Я бы хотел установить точку останова на стороне AMQ, чтобы узнать, что происходит с моими сообщениями, но я не знаю, где ее разместить. Есть идеи?
Может ли это быть проблемой сериализации? Я слышал, что иногда различия в сериализации между брокерами JMS могут привести к странному поведению.
Я действительно застрял здесь, и любая помощь будет оценена по достоинству. См. Информацию о конфигурации ниже.
Wildfly 8.2
AMQ 5.13
Потребительские (сообщения не делает его здесь)
public class PeriodicDerivationExecutionHandlerImpl implements PeriodicDerivationExecutionHandler {
protected DerivationService derivationService;
protected DerivationModelService derivationModelService;
protected Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void executeDerivation(PeriodicDerivation params) throws Exception{
JbpmHibernateUtil.openSession();
Derivation derivation = null;
try{
if (params.isGroup()){
derivation = new GroupDerivation();
GroupQueryParameters qp = new GroupQueryParameters();
qp.setGroupName(params.getItemName());
derivation.setDerivedItem(derivationModelService.getGroup(qp));
}else{
derivation = new DeterminantDerivation();
DeterminantQueryParameters qp = new DeterminantQueryParameters();
qp.setDeterminantName(params.getItemName());
derivation.setDerivedItem(derivationModelService.getDeterminant(qp));
}
logger.info("Executing periodic derivation [" + derivation + "]");
derivation.setModelEffectiveDate(new DateTime());
derivation.setPeriod(params.getPeriod());
derivation.getProcessParameters().add(new DerivationProcessParameter(PeriodicDerivation.PERIODIC_PROCESS_VAR, true));
derivation.setExecutionMode(DerivationExecutionMode.SYNCHRONOUS_LOCAL);
derivationService.executeDerivation(derivation);
JbpmHibernateUtil.closeSession(true);
}catch(Exception e){
logger.error("Periodic derivation execution failed for [" + derivation + "]",e);
JbpmHibernateUtil.closeSession(false);
throw new Exception("Periodic derivation execution failed for [" + derivation + "]",e);
}
}
public DerivationService getDerivationService() {
return derivationService;
}
public void setDerivationService(DerivationService derivationService) {
this.derivationService = derivationService;
}
public DerivationModelService getDerivationModelService() {
return derivationModelService;
}
public void setDerivationModelService(DerivationModelService derivationModelService) {
this.derivationModelService = derivationModelService;
}
}
Потребительское XML конфигурации
<int:gateway id="periodicDerivationExecutionGateway"
service-interface="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandler">
<int:method name="executeDerivation" request-channel="periodicDerivationChannel" />
</int:gateway>
<bean id="periodicDerivationExecutor"
class="com.etse.jbpm.scheduler.PeriodicDerivationExecutionHandlerImpl">
<property name="derivationService" ref="derivationService" />
<property name="derivationModelService" ref="derivationModelService" />
</bean>
<int:service-activator input-channel="periodicDerivationChannel"
ref="periodicDerivationExecutor" method="executeDerivation" />
<int-jms:channel id="periodicDerivationChannel"
queue-name="${jms.destination.name.periodicderivation}" concurrency="${integration.listener.threads.maximum}"
task-executor="periodicDerivationTaskExecutor" />
ActiveMQ Standalone.xml (Jboss)
<subsystem xmlns="urn:jboss:domain:resource-adapters:2.0">
<resource-adapters>
<resource-adapter id="activemq-rar.rar">
<archive>
activemq-rar.rar
</archive>
<transaction-support>XATransaction</transaction-support>
<config-property name="ServerUrl">
tcp://127.0.0.1:61616?jms.rmIdFromConnectionId=true
</config-property>
<config-property name="UserName">
admin
</config-property>
<config-property name="Password">
admin
</config-property>
<connection-definitions>
<connection-definition
class-name="org.apache.activemq.ra.ActiveMQManagedConnectionFactory"
jndi-name="java:/ConnectionFactory"
enabled="true"
pool-name="ConnectionFactory">
<xa-pool>
<min-pool-size>1</min-pool-size>
<max-pool-size>20</max-pool-size>
<prefill>false</prefill>
<is-same-rm-override>false</is-same-rm-override>
</xa-pool>
<recovery>
<recover-credential>
<user-name>admin</user-name>
<password>admin</password>
</recover-credential>
<recover-plugin class-name="org.jboss.jca.core.recovery.ConfigurableRecoveryPlugin">
<config-property name="EnableIsValid">
false
</config-property>
<config-property name="IsValidOverride">
true
</config-property>
<config-property name="EnableClose">
true
</config-property>
</recover-plugin>
</recovery>
</connection-definition>
</connection-definitions>
Очереди/темы
<admin-objects>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"
use-java-context="true"
pool-name="deferredBpmCommandQueue">
<config-property name="PhysicalName">
deferredBpmCommandQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"
use-java-context="true"
pool-name="ActiveMQQueue.asyncActionRequestQueue">
<config-property name="PhysicalName">
asyncActionRequestQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue"
jndi-name="java:jboss/exported/jms/queue/bpm/DLQ"
use-java-context="true"
pool-name="DLQ">
<config-property name="PhysicalName">
DLQ
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue" use-java-context="true" pool-name="ActiveMQQueue.cacheUpdateReplicationQueue">
<config-property name="PhysicalName">
cacheUpdateReplicationQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue" use-java-context="true" pool-name="ActiveMQQueue.periodicDerivationQueue">
<config-property name="PhysicalName">
periodicDerivationQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncServiceSignalQueue">
<config-property name="PhysicalName">
asyncServiceSignalQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/processEventTopic" use-java-context="true" pool-name="ActiveMQTopic.processEventTopic">
<config-property name="PhysicalName">
processEventTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionReplyQueue">
<config-property name="PhysicalName">
asyncActionReplyQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/ExpiryQueue" use-java-context="true" pool-name="ActiveMQQueue.ExpiryQueue">
<config-property name="PhysicalName">
ExpiryQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusRequestTopic">
<config-property name="PhysicalName">
asyncActionServiceStatusRequestTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityRequestTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityRequestTopic">
<config-property name="PhysicalName">
asyncActionAffinityRequestTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue" use-java-context="true" pool-name="ActiveMQQueue.jbpmJobQueue">
<config-property name="PhysicalName">
jbpmJobQueue
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionAffinityReplyTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionAffinityReplyTopic">
<config-property name="PhysicalName">
asyncActionAffinityReplyTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationEventTopic" use-java-context="true" pool-name="ActiveMQTopic.cacheUpdateReplicationEventTopic">
<config-property name="PhysicalName">
cacheUpdateReplicationEventTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQTopic" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceStatusTopic" use-java-context="true" pool-name="ActiveMQTopic.asyncActionServiceStatusTopic">
<config-property name="PhysicalName">
asyncActionServiceStatusTopic
</config-property>
</admin-object>
<admin-object class-name="org.apache.activemq.command.ActiveMQQueue" jndi-name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue" use-java-context="true" pool-name="ActiveMQQueue.asyncActionServiceLogRecordQueue">
<config-property name="PhysicalName">
asyncActionServiceLogRecordQueue
</config-property>
</admin-object>
</admin-objects>
Брокер конфигурации
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker1" dataDirectory="${activemq.data}" persistent="true">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<import resource="jetty.xml"/>
HornetQ Standalone.xml (Jboss)
<subsystem xmlns="urn:jboss:domain:messaging:2.0">
<hornetq-server>
<persistence-enabled>false</persistence-enabled>
<jmx-management-enabled>true</jmx-management-enabled>
<shared-store>true</shared-store>
<journal-type>ASYNCIO</journal-type>
<journal-file-size>102400</journal-file-size>
<journal-min-files>2</journal-min-files>
<connectors>
<netty-connector name="netty" socket-binding="messaging"/>
<netty-connector name="netty-throughput" socket-binding="messaging-throughput">
<param key="batch-delay" value="50"/>
</netty-connector>
<in-vm-connector name="in-vm" server-id="0"/>
</connectors>
<acceptors>
<netty-acceptor name="netty" socket-binding="messaging"/>
<netty-acceptor name="netty-throughput" socket-binding="messaging-throughput">
<param key="batch-delay" value="50"/>
<param key="direct-deliver" value="false"/>
</netty-acceptor>
<in-vm-acceptor name="in-vm" server-id="0"/>
</acceptors>
<security-settings>
<security-setting match="#">
<permission type="send" roles="guest"/>
<permission type="consume" roles="guest"/>
<permission type="createNonDurableQueue" roles="guest"/>
<permission type="deleteNonDurableQueue" roles="guest"/>
</security-setting>
</security-settings>
<address-settings>
<address-setting match="#">
<dead-letter-address>jms.queue.DLQ</dead-letter-address>
<expiry-address>jms.queue.ExpiryQueue</expiry-address>
<redelivery-delay>0</redelivery-delay>
<max-size-bytes>104857600</max-size-bytes>
<page-size-bytes>10485760</page-size-bytes>
<page-max-cache-size>10</page-max-cache-size>
<address-full-policy>PAGE</address-full-policy>
<message-counter-history-day-limit>10</message-counter-history-day-limit>
</address-setting>
</address-settings>
<jms-connection-factories>
<connection-factory name="InVmConnectionFactory">
<connectors>
<connector-ref connector-name="in-vm"/>
</connectors>
<entries>
<entry name="java:/ConnectionFactory"/>
</entries>
</connection-factory>
<connection-factory name="RemoteConnectionFactory">
<connectors>
<connector-ref connector-name="netty"/>
</connectors>
<entries>
<entry name="java:jboss/exported/jms/RemoteConnectionFactory"/>
</entries>
<client-failure-check-period>30000</client-failure-check-period>
<connection-ttl>300000</connection-ttl>
<retry-interval>2000</retry-interval>
<retry-interval-multiplier>1</retry-interval-multiplier>
<max-retry-interval>2000</max-retry-interval>
<reconnect-attempts>100</reconnect-attempts>
</connection-factory>
</jms-connection-factories>
Очереди/Темы
<jms-destinations>
<jms-queue name="asyncActionRequestQueue">
<entry name="queue/bpm/asyncActionRequestQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionRequestQueue"/>
</jms-queue>
<jms-queue name="asyncActionReplyQueue">
<entry name="queue/bpm/asyncActionReplyQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionReplyQueue"/>
</jms-queue>
<jms-queue name="asyncServiceSignalQueue">
<entry name="queue/bpm/asyncServiceSignalQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncServiceSignalQueue"/>
</jms-queue>
<jms-queue name="asyncActionServiceLogRecordQueue">
<entry name="queue/bpm/asyncActionServiceLogRecordQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/asyncActionServiceLogRecordQueue"/>
</jms-queue>
<jms-queue name="deferredBpmCommandQueue">
<entry name="queue/bpm/deferredBpmCommandQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/deferredBpmCommandQueue"/>
</jms-queue>
<jms-queue name="jbpmJobQueue">
<entry name="queue/bpm/jbpmJobQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/jbpmJobQueue"/>
</jms-queue>
<jms-queue name="DLQ">
<entry name="queue/DLQ"/>
<entry name="java:jboss/exported/jms/queue/DLQ"/>
</jms-queue>
<jms-queue name="ExpiryQueue">
<entry name="queue/ExpiryQueue"/>
<entry name="java:jboss/exported/jms/queue/ExpiryQueue"/>
</jms-queue>
<jms-queue name="periodicDerivationQueue">
<entry name="queue/bpm/periodicDerivationQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/periodicDerivationQueue"/>
</jms-queue>
<jms-queue name="cacheUpdateReplicationQueue">
<entry name="queue/bpm/cacheUpdateReplicationQueue"/>
<entry name="java:jboss/exported/jms/queue/bpm/cacheUpdateReplicationQueue"/>
</jms-queue>
<jms-topic name="asyncActionServiceStatusTopic">
<entry name="topic/bpm/asyncActionServiceStatusTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusTopic"/>
</jms-topic>
<jms-topic name="asyncActionServiceStatusRequestTopic">
<entry name="topic/bpm/asyncActionServiceStatusRequestTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionServiceStatusRequestTopic"/>
</jms-topic>
<jms-topic name="asyncActionAffinityRequestTopic">
<entry name="topic/bpm/asyncActionAffinityRequestTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityRequestTopic"/>
</jms-topic>
<jms-topic name="asyncActionAffinityReplyTopic">
<entry name="topic/bpm/asyncActionAffinityReplyTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/asyncActionAffinityReplyTopic"/>
</jms-topic>
<jms-topic name="processEventTopic">
<entry name="topic/bpm/processEventTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/processEventTopic"/>
</jms-topic>
<jms-topic name="cacheUpdateReplicationEventTopic">
<entry name="topic/bpm/cacheUpdateReplicationEventTopic"/>
<entry name="java:jboss/exported/jms/topic/bpm/cacheUpdateReplicationEventTopic"/>
</jms-topic>
</jms-destinations>
Пожалуйста, добавьте конфигурацию broker.xml для activemq вместе с информацией о конфигурации пользователя – Ashoka
Спасибо. Я добавил конфигурацию брокера, и я скоро добавлю потребительскую конфигурацию. – user3029642
Добавлена конфигурация потребителя – user3029642