2015-03-10 4 views
1

Недавно я столкнулся с необходимостью использования JMS-потребителем в Spring Integration - способной потреблять пакет большой громкости, не подчеркивая, что моя целевая база данных Oracle слишком много коммитов.Практическое руководство. Внедрение BatchMessageListenerContainer для массового потребления очереди JMS

DefaultMessageListenerContainer не поддерживает ничего, кроме сообщений по транзакциям сообщений.

Я googled для решений и нашел пару - но многие из них пострадали от реализации не по наследству от DMLC, а путем клонирования и изменения исходного исходного кода от того же самого - что делает его уязвимым для взлома в случае, если я позже желаю чтобы перейти к более поздней версии spring-jms. Также клонированный код ссылается на частные свойства DMLC, которые, следовательно, должны быть исключены. И чтобы все это работало, нужно было несколько интерфейсов и пользовательский прослушиватель сообщений. В общем, я не чувствовал себя комфортно.

Итак - что делать?

ответ

2

Хорошо - это простое и компактное решение, которое полностью основано на одном классе, полученном из DefaultMessageListenerContainer.

Я только протестировал с помощью адаптера с подключением к сообщениям и ChainedTransactionManager, хотя - поскольку это своего рода базовый сценарий, когда нужно делать такие вещи.

Это код:

package dk.itealisten.myservice.spring.components; 

import java.util.ArrayList; 
import java.util.Enumeration; 

import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 

import org.springframework.jms.listener.DefaultMessageListenerContainer; 

public class BatchMessageListenerContainer extends DefaultMessageListenerContainer { 

    public static final int DEFAULT_BATCH_SIZE = 100; 

    public int batchSize = DEFAULT_BATCH_SIZE; 

    /** 
     * Override the method receiveMessage to return an instance of BatchMessage - an inner class being declared further down. 
     */ 
    @Override 
    protected Message receiveMessage(MessageConsumer consumer) throws JMSException { 
     BatchMessage batch = new BatchMessage(); 
     while (!batch.releaseAfterMessage(super.receiveMessage(consumer))); 
     return batch.messages.size() == 0 ? null : batch; 
    } 

    /** 
     * As BatchMessage implements the javax.jms.Message interface it fits perfectly into the DMLC - only caveat is that SimpleMessageConverter dont know how to convert it to a Spring Integration Message - but that can be helped. 
     * As BatchMessage will only serve as a container to carry the actual javax.jms.Message's from DMLC to the MessageListener it need not provide meaningful implementations of the methods of the interface as long as they are there. 
     */ 
    protected class BatchMessage implements Message { 

     public ArrayList<Message> messages = new ArrayList<Message>(); 

     /** 
      * Add message to the collection of messages and return true if the batch meets the criteria for releasing it to the MessageListener. 
      */  
     public boolean releaseAfterMessage(Message message) { 
      if(message != null) { 
       messages.add(message); 
      } 
      // Are we ready to release? 
      return message == null || messages.size() >= batchSize; 
     } 

     @Override 
     public String getJMSMessageID() throws JMSException { 
      return null; 
     } 

     @Override 
... 

Ниже приведен пример, показывающий, как он может быть использован в контексте Spring приложения:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:jms="http://www.springframework.org/schema/integration/jms" 
    xmlns:p="http://www.springframework.org/schema/p" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/beans  
    http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration-4.0.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.0.xsd"> 

<!-- Plug in the BatchMessageListenerContainer in a message-driven-channel-adapter --> 
<jms:message-driven-channel-adapter container-class="dk.itealisten.myservice.spring.components.BatchMessageListenerContainer" 
    acknowledge="transacted" 
    channel="from.mq" 
    concurrent-consumers="5" 
    max-concurrent-consumers="15" 
    connection-factory="jmsConnectionFactory" 
    transaction-manager="transactionManager" 
    destination="my.mq.queue" 
    /> 

<!-- Flow processing the BatchMessages being posted on the "from.mq" channel --> 
<int:chain input-channel="from.mq" output-channel="nullChannel"> 
    <int:splitter expression="payload.messages" /> 
    <!-- This is where we deal with conversion to spring messages as the payload is now a single standard javax.jms.Message implementation --> 
    <int:transformer ref="smc" method="fromMessage"/> 
    <!-- And finally we persist --> 
    <int:service-activator ref="jdbcPublisher" method="persist"/> 
</int:chain> 

<!-- Various supporting beans --> 

<!-- A bean to handle the database persistance --> 
<bean id="jdbcPersistor" class="dk.itealisten.myservice.spring.components.JdbcPersistor" p:dataSource-ref="dataSource" /> 

<!-- A bean to handle the conversion that could not take place in the MessageListener as it don't know how to convert a BatchMessage --> 
<bean id="smc" class="org.springframework.jms.support.converter.SimpleMessageConverter"/> 

<!-- Transaction manager must make sure messages are committed outbound (JDBC) before cleaned up inbound (JMS). --> 
<bean id="transactionManager" class="org.springframework.data.transaction.ChainedTransactionManager"> 
    <constructor-arg name="transactionManagers"> 
    <list> 
     <bean class="org.springframework.jms.connection.JmsTransactionManager" p:connectionFactory-ref="jmsConnectionFactory" /> 
     <bean class="org.springframework.jdbc.datasource.DataSourceTransactionManager" p:dataSource-ref="dataSource" /> 
    </list> 
    </constructor-arg> 
</bean>