2016-06-30 3 views
1

У нас есть кластер с двумя узлами RabbitMQ с политикой Ha-all. Мы используем Spring AMQP в нашем приложении для общения с RabbitMQ. Часть производителя работает нормально, но потребитель работает некоторое время и паузы. Производители и потребители работают как разные приложения. Дополнительная информация о потребительской части.Spring Amqp Consumer приостанавливается после запуска на некоторое время

  • мы используем SimpleMessageListenerContainer с ChannelAwareMessageListener, используйте ручной режим ack и по умолчанию prefetch(1)
  • В нашем приложении мы создаем очередь (по требованию) и добавить его к слушателю
  • Когда мы начинали с 10 ConcurrentConsumers и 20 MaxConcurrentConsumers, потребление происходит около 15 часов и паузы. Такая ситуация происходит в течение 1 часа, когда мы увеличиваем MaxConcurrentConsumers до 75.

На RabbitMQ UI, мы видим, каналы с 3/4 ип ack сообщений ред на вкладке канала, когда происходит такая ситуация, до тех пор он просто не имеют 1 un ack ed сообщение.

Наш свалка для резьбы аналогична this. Но с биением сердца до 60 не помогло улучшить эту ситуацию.

В большинстве дампов потоков имеется следующее сообщение. При необходимости я приложу весь свалку потока. Дайте мне знать, если мне не хватает какой-либо настройки, которая может привести к тому, что потребитель остановится?

"pool-6-thread-16" #86 prio=5 os_prio=0 tid=0x00007f4db09cb000 nid=0x3b33 waiting on condition [0x00007f4ebebec000] 
    java.lang.Thread.State: WAITING (parking) 
    at sun.misc.Unsafe.park(Native Method) 
    - parking to wait for <0x00000007b9930b68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) 
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) 
    at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) 
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:660) 
    at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144) 
    at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Подробнее Мы динамически добавлять и удалять очереди в SimpleMessageListenerContainer и мы подозреваем, что это вызывает некоторые проблемы, потому что каждый раз, когда мы добавить или удалить очереди от слушателя, все BlockingQueueConsumer удалены и созданы заново. Как вы думаете, может ли это вызвать эту проблему?

+0

Вам нужно опубликовать полную версию dump где-то, возможно, не здесь, потому что она слишком большая; может быть, что-то вроде пастебина или github gist. Скорее всего, контейнерный поток застрял в вашем коде где-нибудь. –

+0

@GaryRussell: http://pastebin.com/UrBLfn2C - это папка, содержащая полный дамп потока. – Kot

+0

Дамп потока выглядит хорошо - все потоки контейнера ждут в 'nextMessage()', поэтому похоже, что ваша текущая теория верна - что-то в сети тихо удалило соединение - некоторые маршрутизаторы делают это для простоя соединений. Установка запрошенного биения должна поддерживать соединение живым - вам нужно будет использовать сетевой монитор (tcpdump, wireshark и т. Д.), Чтобы понять это. –

ответ

0

Ваша проблема находится где-то ниже по течению в целевом слушателе.

Посмотрите, prefetch(1) причина этого:

this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount); 

И дальше, если мы не опрашивать эту очередь то, что мы имеем здесь?

BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body)); 

Право, парковка на замке.

0

AMQP-621 теперь объединен с мастером; мы выпустим 1.6.1.RELEASE в ближайшие дни.

 Смежные вопросы

  • Нет связанных вопросов^_^