2016-05-03 3 views
1

У меня была страшная ночь, пытаясь выяснить, что происходит с RabbitMQ и SpringXD, к сожалению, без успеха.RabbitMQ - Отключение канала: ошибка соединения (SpringXD неоднократно закрывает соединения rabbitmq.)

Проблема: SpringXD закрывает RabbitMQ соединения неоднократно, или отчеты предупреждения, связанные с размером кэша канала.

Фрагмент из журнала SpringXD (во время потока инициализации/автоматического связывания):

2016-05-03T07:42:43+0200 1.3.0.RELEASE WARN 
DeploymentsPathChildrenCache-0 listener.SimpleMessageListenerContainer 
- CachingConnectionFactory's channelCacheSize can not be less than the 
number of concurrentConsumers so it was reset to match: 4 

... 

2016-05-03T07:54:17+0200 1.3.0.RELEASE ERROR AMQP Connection 
192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error 

2016-05-03T17:38:58+0200 1.3.0.RELEASE ERROR AMQP Connection 
192.168.120.125:5672 connection.CachingConnectionFactory - Channel shutdown: connection error; protocol method: 
method<connection.close>(reply-code=504, reply-text=CHANNEL_ERROR - 
second 'channel.open' seen, class-id=20, method-id=10) 

Фрагмент из журнала RabbitMQ:

=WARNING REPORT==== 3-May-2016::08:08:09 === closing AMQP connection <0.22276.61> (192.168.120.125:59350 -> 192.168.120.125:5672): client 
unexpectedly closed TCP connection 

=ERROR REPORT==== 3-May-2016::08:08:11 === closing AMQP connection 0.15409.61> (192.168.120.125:58527 -> 192.168.120.125:5672): 
{writer,send_failed,{error,closed}} 

состояние заблокирован ошибки редко

=ERROR REPORT==== 3-May-2016::17:38:58 === Error on AMQP connection <0.20542.25> (192.168.120.125:59421 -> 192.168.120.125:5672, vhost: 
'/', user: 'xd', state: blocked), channel 7: operation channel.open 
caused a connection exception channel_error: "second 'channel.open' 
seen" 

Моя установка (6 узлов)

- springxd 1.3.0 distributed (zookeeper) 
- RabbitMQ 3.6.0, Erlang R16B03-1 cluster 


    ackMode:     AUTO ## or NONE 
    autoBindDLQ:    false 
    backOffInitialInterval: 1000 
    backOffMaxInterval:  10000 
    backOffMultiplier:   2.0 
    batchBufferLimit:   10000 
    batchingEnabled:   false 
    batchSize:     200 
    batchTimeout:    5000 
    compress:     false 
    concurrency:    4 
    deliveryMode:    NON_PERSISTENT ## or PERSISTENT 
    durableSubscription:  false 
    maxAttempts:    10 
    maxConcurrency:   10 
    prefix:     xdbus. 
    prefetch:     1000 
    replyHeaderPatterns:  STANDARD_REPLY_HEADERS,* 
    republishToDLQ:   false 
    requestHeaderPatterns:  STANDARD_REQUEST_HEADERS,* 
    requeue:     true 
    transacted:    false 
    txSize:     1000 

весна: RabbitMQ:

addresses: 
priv1:5672,priv2:5672,priv3:5672, 
priv4:5672,priv5:5672,priv6:5672 

adminAddresses: 
http://priv1:15672, http://priv2:15672, http://priv3:15672, http://priv4:15672, http://priv5:15672,http://priv6:15672 

nodes: 
[email protected],[email protected],[email protected], 
[email protected],[email protected],[email protected] 

username: xd 
password: xxxx 
virtual_host:/
useSSL: false 

ха-xdbus политика:

- ^xdbus\. all 
- ha-mode: exactly 
- ha-params: 2 
- queue-master-locator: min-masters 

Кролик конф

[ 
{rabbit, 
[ 
    {tcp_listeners, [5672]}, 
    {queue_master_locator, "min-masters"} 
] 
} 
]. 

Когда ackMode не NONE происходит следующее:

В конце концов число потребителей снизится до нуля, и у меня есть зомби потоки, которые не излечивались от этого состояния , что, в свою очередь, вызывает нежелательную очередность.

Когда ackMode это AUTO происходит следующее:

Некоторые сообщения, оставленные не-acked навсегда.

SpringXD потоки и долговечные очереди

модуля Rabbit используются в качестве источника или раковины, не пользовательского автоматического связывания.

Типичные определения потока являются следующие:

Проглатывание:

event_generator | rabbit --mappedRequestHeaders=XDRoutingKey --routingKey='headers[''XDRoutingKey'']' 

Processing/Раковина:

rabbit --queues='xdbus.INQUEUE-A' | ENRICHMENT-PROCESSOR-A | elastic-sink 
rabbit --queues='xdbus.INQUEUE-B' | ENRICHMENT-PROCESSOR-B | elastic-sink 

xdbus.INQUEUE-ххх вручную создается из GUI Rabbit администратора.(Прочные)

Глобальная статистика (от RabbitMQ Admin)

  • Контактов: 190
  • Каналы: (проблема кэша канала возможно?) 2263
  • обмены: 20
  • Очереди: 120
  • Потребители: 1850

Наконец:

Я был бы признателен, если кто-то может ответить, что не так с конфигурацией (я уверен, что сеть работает хорошо, так что нет проблем в сети, и нет никаких проблем, связанных с макс открытых файлов ограничение).

Скорость передачи сообщений варьируется от 2К/с до 30к/с, что является относительной небольшой нагрузкой.

Спасибо!

Иван

ответ

1

Мы видели некоторые similar instability when churning channels at a high rate.

Обход должен был увеличить размер кеша канала, чтобы избежать высокой скорости вспенивания; неясно, где находится нестабильность, но я не верю, что это весной AMQP.

Одна из проблем, однако, заключается в том, что XD не предоставляет channelCacheSize как собственность.

. Ответ по приведенной выше ссылке имеет обход, чтобы добавить свойство, заменив конфигурацию конфигурации шины. Увеличение размера кеша решило проблему пользователя.

У нас есть open JIRA issue to expose the property, но он еще не реализован.

Я вижу, что вы изначально разместили это как «ответ» на этот вопрос.

Может ли кто-нибудь быть более конкретным и объяснить, где именно установить rabbit-bus.xml и почему это происходит в любом случае.

Как говорится там, вам нужно поместить его в директорию конфигурации XD:

xd/config/META-INF/spring-xd/bus/rabbit-bus.xml.

EDIT

Методика с использованием механизма расширения шины вместо ...

$ cat xd/config/META-INF/spring-xd/bus/ext/cf.xml 

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> 

    <bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory"> 
     <constructor-arg ref="rabbitFactory" /> 
     <property name="addresses" value="${spring.rabbitmq.addresses}" /> 
     <property name="username" value="${spring.rabbitmq.username}" /> 
     <property name="password" value="${spring.rabbitmq.password}" /> 
     <property name="virtualHost" value="${spring.rabbitmq.virtual_host}" /> 
     <property name="channelCacheSize" value="${spring.rabbitmq.channelCacheSize:100}" /> 
    </bean> 

</beans> 

EDIT: РЕЗУЛЬТАТЫ ИСПЫТАНИЙ

заполняемых очередь foo с 1 миллиона сообщений.

concurrency:    10 
    prefetch:     1000 
    txSize:     1000 

.

xd:>stream create foo --definition "rin:rabbit --concurrency=10 --maxConcurrency=10 --prefetch=1000 --txSize=1000 | t1:transform | t2:transform | rout:rabbit --routingKey='''bar'''" --deploy 
Created and deployed new stream 'foo' 

Таким образом, с этой конфигурацией мы получаем 40 потребителей.

Я никогда не видел более 29 издательских каналов от автобуса, было 10 издателей для раковины.

1m сообщения были переданы от foo к bar менее чем за 5 минут (с помощью xdbus.foo.0, xdbus.foo.1 и xdbus.foo.2) - 4m сообщения опубликованы.

Нет ошибок - но мой ноутбук должен остыть: D

+0

Вопрос в том, где находится каталог xd config. ($ SPRING-XD-ROOT/xd/config?) Вместо этого я попробовал это, но это не помогло.
> если (this.getConnectionFactory() InstanceOf CachingConnectionFactory) { > \t \t \t CachingConnectionFactory ср = (CachingConnectionFactory) getConnectionFactory(); > \t \t \t, если (cf.getCacheMode() == CacheMode.CHANNEL &&> cf.getChannelCacheSize() cf.setChannelCacheSize (200); > /*cf.setChannelCacheSize(this.concurrentConsumers);*/ –

+0

Нет, вы не можете использовать такую ​​логику; вам нужно поместить этот точный файл из ответа в точное место, которое я описываю выше, - '/xd/config/META-INF/spring-xd/bus/rabbit-bus.xml'. Он ** заменяет ** существующий файл в банке xd-dirt - это местоположение ('/ xd/config') ранее на пути к классам, так что грязь подберет модифицированную версию конфигурации XML для шины. –

+0

Обратите внимание, что это увеличит размер кеша для шины сообщений кролика; для раковины кролика вам нужно добавить 'channel-cache-size' в' 'в rabbit.xml sink. –