2012-01-15 3 views
5

У меня есть система, реализующая Camel и ActiveMQ для связи между некоторыми серверами. Я хотел бы знать, есть ли способ автоматически истекать и удалять сообщения, отправленные в очередь через X период времени. Поскольку исходный сервер (заполнение очереди) не знает, собирает ли кто-нибудь сообщения, я не хочу, чтобы моя очередь росла до тех пор, пока она не была настолько большой, что что-то сработает. Бонусная карма указывает на того, кто может помочь и предоставить java dsl способ реализовать эту функцию.Автоматическое истечение сообщений в Camel

Решение

// expire message after 2 minutes 
long ttl = System.currentTimeMillis() + 120000; 
// send our info one-way to the group topic 
camelTemplate.sendBodyAndHeader("jms:queue:stats", ExchangePattern.InOnly, stats, "JMSExpiration", ttl); 

ответ

1

Ну setJMSExpiration (длинный выдох):

является то, что вы не должны звоните, когда вы являетесь клиентом. См. Мой разговор об этом немного на форуме ActiveMQ.

http://apache-qpid-developers.2158895.n2.nabble.com/MRG-Java-JMS-Expiration-td7171854.html

+0

В вашем сообщении на форуме было похоже, что вы хотели истечь 10 минут, но вместо этого установили 10 секунд. ех. (10 * (60 * 1000)) vs (10 * 1000) –

+0

@Mondain nope, я хотел 10 секунд. 10 минут - это параметр по умолчанию Qpid, когда истеченные сообщения удаляются из очередей. Например: вы отправили сообщение в очередь с истечением срока действия 10 секунд, поэтому через 10 секунд срок действия сообщений истекает QPid, и ни один потребитель не может его использовать, но де-факто сообщение удаляет bu QPid, когда Thread Policy Policy s), а настройка по умолчанию - то, что они начинают через 10 минут – Eugene

+0

Хорошо, я неправильно понял, что вы там разместили. –

3

ум также, что часы между клиентом - брокера должны быть синхронизированы, для экспирации работать должным образом. Если часы не синхронизированы, то срок действия истекает от клиента, возможно, истек, когда сообщение получено в брокере. Или клиентское время опережает брокера, поэтому срок действия превышает 10 секунд.

Это меня бьет, почему истечение срока зависит от клиента. Таким образом AMQ предлагает плагин, чтобы исправить это, перестроив время, чтобы быть только брокером. See http://activemq.apache.org/timestampplugin.html

1

С нашей стороны мы решили добавить время истечения срока действия в определенные пункты назначения, используя маршрут верблюда, развернутый в самой службе ActiveMQ.

Единственное, что нужно сделать, это создать XML-файл, например, с именем, например. setJMSExpiration.xml:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation=" 
    http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd 
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> 

    <camelContext id="camel-set-expiration" xmlns="http://camel.apache.org/schema/spring"> 
    <!-- Copy route for each destination to expire --> 
    <route id="setJMSExpiration.my.queue.dlq"> 
     <from uri="broker:queue:MY.QUEUE.DLQ"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 1 day --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 86400000}</spel> 
     </setHeader> 
     <to uri="broker:queue:MY.QUEUE.DLQ"/> 
    </route> 
    <route id="setJMSExpiration.another.queue"> 
     <from uri="broker:queue:ANOTHER.QUEUE"/> 
     <setHeader headerName="JMSExpiration"> 
      <!-- Message will expire after 5 days --> 
      <spel>#{T(java.lang.System).currentTimeMillis() + 432000000}</spel> 
     </setHeader> 
     <to uri="broker:queue:ANOTHER.QUEUE"/> 
    </route> 
    </camelContext> 
</beans> 

и импортировать его в activemq.xml конфигурации с:

<!-- Add default Expiration (file in the same directory) --> 
<import resource="setJMSExpiration.xml"/> 

В качестве альтернативы вы можете также предоставить конкретную per destination policies, если вы не хотите, чтобы сообщения с истекшим сроком годности, чтобы достигнуть очереди ActiveMQ.DLQ.

<policyEntry queue="MY.QUEUE.DLQ"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 
<policyEntry queue="ANOTHER.QUEUE"> 
    <deadLetterStrategy> 
     <sharedDeadLetterStrategy processExpired="false" /> 
    </deadLetterStrategy> 
</policyEntry> 

Единственным ограничением этого способа является то, что вы не можете легко использовать подстановочные знаки, как это закодировано (можно, но это будет необходимо несколько приспособлений, используя заголовок назначения JMS в маршруте верблюд).

Мы стараемся, чтобы производители определяли timeToLive (и максимально усилили их), но не всегда можно заставить их изменить свой код, чтобы минимизировать количество таких маршрутов.