У нас есть кластер с двумя узлами RabbitMQ с политикой Ha-all. Мы используем Spring AMQP в нашем приложении для общения с RabbitMQ. Часть производителя работает нормально, но потребитель работает некоторое время и паузы. Производители и потребители работают как разные приложения. Дополнительная информация о потребительской части.Spring Amqp Consumer приостанавливается после запуска на некоторое время
- мы используем
SimpleMessageListenerContainer
сChannelAwareMessageListener
, используйте ручной режимack
и по умолчаниюprefetch(1)
- В нашем приложении мы создаем очередь (по требованию) и добавить его к слушателю
- Когда мы начинали с 10
ConcurrentConsumers
и 20MaxConcurrentConsumers
, потребление происходит около 15 часов и паузы. Такая ситуация происходит в течение 1 часа, когда мы увеличиваемMaxConcurrentConsumers
до 75.
На RabbitMQ UI, мы видим, каналы с 3/4 ип ack
сообщений ред на вкладке канала, когда происходит такая ситуация, до тех пор он просто не имеют 1 un ack
ed сообщение.
Наш свалка для резьбы аналогична this. Но с биением сердца до 60 не помогло улучшить эту ситуацию.
В большинстве дампов потоков имеется следующее сообщение. При необходимости я приложу весь свалку потока. Дайте мне знать, если мне не хватает какой-либо настройки, которая может привести к тому, что потребитель остановится?
"pool-6-thread-16" #86 prio=5 os_prio=0 tid=0x00007f4db09cb000 nid=0x3b33 waiting on condition [0x00007f4ebebec000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007b9930b68> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$InternalConsumer.handleDelivery(BlockingQueueConsumer.java:660)
at com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:144)
at com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:99)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Подробнее Мы динамически добавлять и удалять очереди в SimpleMessageListenerContainer и мы подозреваем, что это вызывает некоторые проблемы, потому что каждый раз, когда мы добавить или удалить очереди от слушателя, все BlockingQueueConsumer удалены и созданы заново. Как вы думаете, может ли это вызвать эту проблему?
Вам нужно опубликовать полную версию dump где-то, возможно, не здесь, потому что она слишком большая; может быть, что-то вроде пастебина или github gist. Скорее всего, контейнерный поток застрял в вашем коде где-нибудь. –
@GaryRussell: http://pastebin.com/UrBLfn2C - это папка, содержащая полный дамп потока. – Kot
Дамп потока выглядит хорошо - все потоки контейнера ждут в 'nextMessage()', поэтому похоже, что ваша текущая теория верна - что-то в сети тихо удалило соединение - некоторые маршрутизаторы делают это для простоя соединений. Установка запрошенного биения должна поддерживать соединение живым - вам нужно будет использовать сетевой монитор (tcpdump, wireshark и т. Д.), Чтобы понять это. –