2016-11-10 6 views
1

У меня есть очереди конфигурация, как показано нижеАсинхронные и общий кролик шаблон

@Bean 
    public ConnectionFactory connectionFactory() { 
    CachingConnectionFactory connectionFactory = new   CachingConnectionFactory(hostName); 
    connectionFactory.setUsername(mqUsername); 
    connectionFactory.setPassword(mqPassword); 
    connectionFactory.setVirtualHost(virtualHost); 
    return connectionFactory; 
    } 

    @Bean 
    RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); 
    return rabbitTemplate; 
    } 

    @Bean 
    public AmqpAdmin amqpAdmin() { 
    RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory()); 
    return rabbitAdmin; 
    } 

и у меня есть асинхронная конфигурация, как

@EnableAsync 
@Configuration 
public class AsyncConfiguration implements AsyncConfigurer { 

    @Override 
    public Executor getAsyncExecutor() { 
     return taskExector(); 
    } 

    @Override 
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { 
     return new SimpleAsyncUncaughtExceptionHandler(); 
    } 

    @Bean 
    public ThreadPoolTaskExecutor taskExector() { 
     ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 
     taskExecutor.setCorePoolSize(10); 
     taskExecutor.setMaxPoolSize(10); 
     taskExecutor.initialize(); 
     return taskExecutor; 
    } 

}  

И в моем методе асинхронного я использую AMQP администратор и шаблон кролика боб. Так как в конфигурации у меня будет 10 потоков при максимальном выполнении задач, то, что я нашел после того, как приложение зависает, и принимая дамп с помощью исполнительного механизма, я нашел ниже информацию, похоже на тупик при использовании шаблона/шаблона кролика/amqp из номера строки. Что-то не так с этим подходом или как убедиться, что несколько потоков могут получить доступ к этим бобам кролика.

Версии: Spring загрузки 1.4.0.RELEASE, Java 8.

Моя что-то сервис, как этот

@Service 
public class QDispatcherService implements DispatcherService { 
    private final Logger logger = LoggerFactory.getLogger(this.getClass()); 

    @Autowired 
    private AmqpAdmin amqpAdmin; 


    @Autowired 
    RabbitTemplate rabbitTemplate; 

    @Override 
    public void sendData(Data dataObject) throws Exception { 

     try { 
      //something on this properties , I have to check if queue exist or there are messages in it to decide to add message in other queue 
      Properties properties = amqpAdmin.getQueueProperties(queueName); 
      amqpAdmin.declareQueue(new Queue(queueName)); 
      logger.info("***********************DEBUG 4***********************"); 
      rabbitTemplate.convertAndSend(queueName, dataObject); 

     } catch (Exception ex) { 
      ex.printStackTrace(); 
     } 
    } 

} 

{ 
    "threadName": "taskExector-10", 
    "threadId": 77, 
    "blockedTime": -1, 
    "blockedCount": 317, 
    "waitedTime": -1, 
    "waitedCount": 379, 
    "lockName": "[email protected]", 
    "lockOwnerId": -1, 
    "lockOwnerName": null, 
    "inNative": false, 
    "suspended": false, 
    "threadState": "WAITING", 
    "stackTrace": [ 
     { 
     "methodName": "wait", 
     "fileName": "Object.java", 
     "lineNumber": -2, 
     "className": "java.lang.Object", 
     "nativeMethod": true 
     }, 
     { 
     "methodName": "wait", 
     "fileName": "Object.java", 
     "lineNumber": 502, 
     "className": "java.lang.Object", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "get", 
     "fileName": "BlockingCell.java", 
     "lineNumber": 50, 
     "className": "com.rabbitmq.utility.BlockingCell", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "uninterruptibleGet", 
     "fileName": "BlockingCell.java", 
     "lineNumber": 89, 
     "className": "com.rabbitmq.utility.BlockingCell", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "uninterruptibleGetValue", 
     "fileName": "BlockingValueOrException.java", 
     "lineNumber": 33, 
     "className": "com.rabbitmq.utility.BlockingValueOrException", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "getReply", 
     "fileName": "AMQChannel.java", 
     "lineNumber": 361, 
     "className": "com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "privateRpc", 
     "fileName": "AMQChannel.java", 
     "lineNumber": 226, 
     "className": "com.rabbitmq.client.impl.AMQChannel", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "exnWrappingRpc", 
     "fileName": "AMQChannel.java", 
     "lineNumber": 118, 
     "className": "com.rabbitmq.client.impl.AMQChannel", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "queueDeclare", 
     "fileName": "ChannelN.java", 
     "lineNumber": 844, 
     "className": "com.rabbitmq.client.impl.ChannelN", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "queueDeclare", 
     "fileName": "ChannelN.java", 
     "lineNumber": 61, 
     "className": "com.rabbitmq.client.impl.ChannelN", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": null, 
     "lineNumber": -1, 
     "className": "sun.reflect.GeneratedMethodAccessor176", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": "DelegatingMethodAccessorImpl.java", 
     "lineNumber": 43, 
     "className": "sun.reflect.DelegatingMethodAccessorImpl", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": "Method.java", 
     "lineNumber": 498, 
     "className": "java.lang.reflect.Method", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "invoke", 
     "fileName": "CachingConnectionFactory.java", 
     "lineNumber": 916, 
     "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "queueDeclare", 
     "fileName": null, 
     "lineNumber": -1, 
     "className": "com.sun.proxy.$Proxy166", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "declareQueues", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 577, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "access$200", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 67, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "doInRabbit", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 209, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "doInRabbit", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 206, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin$3", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "doExecute", 
     "fileName": "RabbitTemplate.java", 
     "lineNumber": 1394, 
     "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "execute", 
     "fileName": "RabbitTemplate.java", 
     "lineNumber": 1367, 
     "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "execute", 
     "fileName": "RabbitTemplate.java", 
     "lineNumber": 1343, 
     "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "declareQueue", 
     "fileName": "RabbitAdmin.java", 
     "lineNumber": 206, 
     "className": "org.springframework.amqp.rabbit.core.RabbitAdmin", 
     "nativeMethod": false 
     }, 
     { 
     "methodName": "sendData", 
     "fileName": "QDispatcherService.java", 
     "lineNumber": 59, 
     "className": "com.mycompany.QDispatcherService", 
     "nativeMethod": false 
     }, 

     .... 

     "lockedMonitors": [ 
     { 
     "className": "java.lang.Object", 
     "identityHashCode": 285810320, 
     "lockedStackFrame": { 
      "methodName": "invoke", 
      "fileName": "CachingConnectionFactory.java", 
      "lineNumber": 916, 
      "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler", 
      "nativeMethod": false 
     }, 
     "lockedStackDepth": 13 
     } 
    ], 
    "lockedSynchronizers": [ 
     { 
     "className": "java.util.concurrent.ThreadPoolExecutor$Worker", 
     "identityHashCode": 372417558 
     } 
    ], 
    "lockInfo": { 
     "className": "com.rabbitmq.utility.BlockingValueOrException", 
     "identityHashCode": 274673849 
    } 
    }, 

________________________________________________________________________-

Новый след

{ 
"threadName": "taskExector-10", 
"threadId": 77, 
"blockedTime": -1, 
"blockedCount": 37, 
"waitedTime": -1, 
"waitedCount": 1113, 
"lockName": "[email protected]", 
"lockOwnerId": 65, 
"lockOwnerName": "taskExector-8", 
"inNative": false, 
"suspended": false, 
"threadState": "BLOCKED", 
"stackTrace": [ 
    { 
    "methodName": "writeFrame", 
    "fileName": "SocketFrameHandler.java", 
    "lineNumber": 170, 
    "className": "com.rabbitmq.client.impl.SocketFrameHandler", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "writeFrame", 
    "fileName": "AMQConnection.java", 
    "lineNumber": 542, 
    "className": "com.rabbitmq.client.impl.AMQConnection", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "transmit", 
    "fileName": "AMQCommand.java", 
    "lineNumber": 104, 
    "className": "com.rabbitmq.client.impl.AMQCommand", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "quiescingTransmit", 
    "fileName": "AMQChannel.java", 
    "lineNumber": 337, 
    "className": "com.rabbitmq.client.impl.AMQChannel", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "transmit", 
    "fileName": "AMQChannel.java", 
    "lineNumber": 313, 
    "className": "com.rabbitmq.client.impl.AMQChannel", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "basicPublish", 
    "fileName": "ChannelN.java", 
    "lineNumber": 686, 
    "className": "com.rabbitmq.client.impl.ChannelN", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "basicPublish", 
    "fileName": "ChannelN.java", 
    "lineNumber": 668, 
    "className": "com.rabbitmq.client.impl.ChannelN", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": null, 
    "lineNumber": -1, 
    "className": "sun.reflect.GeneratedMethodAccessor176", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": "DelegatingMethodAccessorImpl.java", 
    "lineNumber": 43, 
    "className": "sun.reflect.DelegatingMethodAccessorImpl", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": "Method.java", 
    "lineNumber": 498, 
    "className": "java.lang.reflect.Method", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "invoke", 
    "fileName": "CachingConnectionFactory.java", 
    "lineNumber": 916, 
    "className": "org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "basicPublish", 
    "fileName": null, 
    "lineNumber": -1, 
    "className": "com.sun.proxy.$Proxy166", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "doSend", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 1451, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "doInRabbit", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 703, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate$3", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "doExecute", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 1394, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "execute", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 1367, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "send", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 699, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "convertAndSend", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 767, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    }, 
    { 
    "methodName": "convertAndSend", 
    "fileName": "RabbitTemplate.java", 
    "lineNumber": 754, 
    "className": "org.springframework.amqp.rabbit.core.RabbitTemplate", 
    "nativeMethod": false 
    } 
+0

Вы должны показать свой код, как вы используете 'RabbitTemplate' и' RabbitAdmin' –

+0

Добавлено в главный вопрос – user3444718

+0

Хорошо. Можете ли вы поделиться некоторым простым загрузочным приложением, чтобы играть с нашей стороны? –

ответ

0

Немного необычно объявлять очередь на каждую отправку; что, помимо неэффективности, он должен работать. Трассировка стека указывает, что мы застряли в клиенте кролика, ожидая ответа для объявления очереди. Я видел это, когда более одного потока использует один и тот же канал, но этого никогда не должно происходить, потому что канал всегда возвращается в кеш, когда текущая операция (админ, шаблон) завершается и не может использоваться несколькими потоками.

Возможно, вы захотите попробовать новый барабан 4.0.0.RC1 amqp-client, так как теперь они добавили регистрацию (которая недоступна в клиенте 3.x.x). Это может помочь проследить все.

+0

Спасибо, я постараюсь использовать новую версию. Я хочу исключить, что это связано с тем, что какое-то другое приложение пытается использовать/delete queue в то же время, когда мы пытаемся получить либо реквизиты очереди, либо пытаемся объявить очередь. Поскольку u сказал, что он ждет ответа от кролика, означает ли это, что нет ничего лучше блокировки ресурсов из-за нашего пользовательского пула потоков для асинхронного, I T hink, это не так, как хотелось бы подтвердить. (Исходный эксперимент выглядит, если я удаляю асинхронный характер моего сервиса, он работает нормально, но должен делать больше тестирования). И почему это не время, как установить это как для работы кролика, так и для – user3444718

+0

. Я не знаю внутренних компонентов кроличьего клиента достаточно хорошо - вам придется задавать этот вопрос в группе gobrmm-пользователей google. Я видел это раньше, когда кто-то пытался отправить сообщение каналом, который доставлял возвращенное сообщение, - это был аналогичный тупик, но я не вижу, что вы используете обязательное здесь. –

+0

Я не использую amqp lib напрямую, я использую spring-ampq, который является 1.6.1.RELEASE. (spring-boot-starter-amqp spring boot ver 1.4.0.release) – user3444718

 Смежные вопросы

  • Нет связанных вопросов^_^