2016-10-11 6 views
2

Когда я бегу Outbound Channel Adapter пример MQTT он выдает ошибку:Как реализовать сервер MQTT с использованием Spring Integration?

Executing command line: /Library/Java/JavaVirtualMachines/jdk1.8.0_45.jdk/Contents/Home/bin/java -classpath /Users/afarber/src/spring-newbie/MqttOutbound/target/classes:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot/1.4.1.RELEASE/spring-boot-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter/1.4.1.RELEASE/spring-boot-starter-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-autoconfigure/1.4.1.RELEASE/spring-boot-autoconfigure-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/boot/spring-boot-starter-logging/1.4.1.RELEASE/spring-boot-starter-logging-1.4.1.RELEASE.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-classic/1.1.7/logback-classic-1.1.7.jar:/Users/afarber/.m2/repository/ch/qos/logback/logback-core/1.1.7/logback-core-1.1.7.jar:/Users/afarber/.m2/repository/org/slf4j/jcl-over-slf4j/1.7.21/jcl-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/jul-to-slf4j/1.7.21/jul-to-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/slf4j/log4j-over-slf4j/1.7.21/log4j-over-slf4j-1.7.21.jar:/Users/afarber/.m2/repository/org/yaml/snakeyaml/1.17/snakeyaml-1.17.jar:/Users/afarber/.m2/repository/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar:/Users/afarber/.m2/repository/org/springframework/spring-context/4.3.2.RELEASE/spring-context-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-aop/4.3.2.RELEASE/spring-aop-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-beans/4.3.2.RELEASE/spring-beans-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-expression/4.3.2.RELEASE/spring-expression-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-core/4.3.2.RELEASE/spring-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/commons-logging/commons-logging/1.2/commons-logging-1.2.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-messaging/4.3.3.RELEASE/spring-messaging-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/spring-tx/4.3.3.RELEASE/spring-tx-4.3.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/retry/spring-retry/1.1.3.RELEASE/spring-retry-1.1.3.RELEASE.jar:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-mqtt/4.3.2.RELEASE/spring-integration-mqtt-4.3.2.RELEASE.jar:/Users/afarber/.m2/repository/org/eclipse/paho/org.eclipse.paho.client.mqttv3/1.0.2/org.eclipse.paho.client.mqttv3-1.0.2.jar de.afarber.mqttoutbound.MqttJavaApplication 

    . ____   _   __ _ _ 
/\\/___'_ __ _ _(_)_ __ __ _ \ \ \ \ 
(()\___ | '_ | '_| | '_ \/ _` | \ \ \ \ 
\\/ ___)| |_)| | | | | || (_| | )))) 
    ' |____| .__|_| |_|_| |_\__, |//// 
=========|_|==============|___/=/_/_/_/ 
:: Spring Boot ::  (v1.4.1.RELEASE) 

2016-10-11 21:53:36.811 INFO 2102 --- [   main] d.a.mqttoutbound.MqttJavaApplication  : Starting MqttJavaApplication on mba.local with PID 2102 (/Users/afarber/src/spring-newbie/MqttOutbound/target/classes started by afarber in /Users/afarber/src/spring-newbie/MqttOutbound) 
2016-10-11 21:53:36.816 INFO 2102 --- [   main] d.a.mqttoutbound.MqttJavaApplication  : No active profile set, falling back to default profiles: default 
2016-10-11 21:53:36.960 INFO 2102 --- [   main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.spring[email protected]35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy 
2016-10-11 21:53:37.724 INFO 2102 --- [   main] o.s.b.f.config.PropertiesFactoryBean  : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties] 
2016-10-11 21:53:37.729 INFO 2102 --- [   main] o.s.i.config.IntegrationRegistrar  : No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created. 
2016-10-11 21:53:37.933 INFO 2102 --- [   main] faultConfiguringBeanFactoryPostProcessor : No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created. 
2016-10-11 21:53:37.947 INFO 2102 --- [   main] faultConfiguringBeanFactoryPostProcessor : No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created. 
2016-10-11 21:53:38.143 INFO 2102 --- [   main] o.s.b.f.config.PropertiesFactoryBean  : Loading properties file from URL [jar:file:/Users/afarber/.m2/repository/org/springframework/integration/spring-integration-core/4.3.2.RELEASE/spring-integration-core-4.3.2.RELEASE.jar!/META-INF/spring.integration.default.properties] 
2016-10-11 21:53:38.148 INFO 2102 --- [   main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class org.springframework.beans.factory.config.PropertiesFactoryBean] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 
2016-10-11 21:53:38.177 INFO 2102 --- [   main] trationDelegate$BeanPostProcessorChecker : Bean 'integrationGlobalProperties' of type [class java.util.Properties] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying) 
2016-10-11 21:53:38.592 INFO 2102 --- [   main] o.s.s.c.ThreadPoolTaskScheduler   : Initializing ExecutorService 'taskScheduler' 
2016-10-11 21:53:39.064 INFO 2102 --- [   main] o.s.j.e.a.AnnotationMBeanExporter  : Registering beans for JMX exposure on startup 
2016-10-11 21:53:39.077 INFO 2102 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase -2147483648 
2016-10-11 21:53:39.078 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : Adding {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel 
2016-10-11 21:53:39.078 INFO 2102 --- [   main] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 1 subscriber(s). 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : started mqttJavaApplication.mqttOutbound.serviceActivator 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] ProxyFactoryBean$MethodInvocationGateway : started mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.079 INFO 2102 --- [   main] GatewayCompletableFutureProxyFactoryBean : started mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.080 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 
2016-10-11 21:53:39.080 INFO 2102 --- [   main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s). 
2016-10-11 21:53:39.080 INFO 2102 --- [   main] o.s.i.endpoint.EventDrivenConsumer  : started _org.springframework.integration.errorLogger 
2016-10-11 21:53:39.093 INFO 2102 --- [   main] d.a.mqttoutbound.MqttJavaApplication  : Started MqttJavaApplication in 2.962 seconds (JVM running for 3.669) 
Exception in thread "main" org.springframework.messaging.MessagingException: Dispatcher failed to deliver Message; nested exception is org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
    at org.springframework.integration.dispatcher.AbstractDispatcher.wrapExceptionIfNecessary(AbstractDispatcher.java:133) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:120) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:143) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:135) 
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:375) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:477) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:429) 
    at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:420) 
    at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) 
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) 
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) 
    at com.sun.proxy.$Proxy40.sendToMqtt(Unknown Source) 
    at de.afarber.mqttoutbound.MqttJavaApplication.main(MqttJavaApplication.java:27) 
Caused by: org.springframework.messaging.MessagingException: Failed to connect; nested exception is Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.checkConnection(MqttPahoMessageHandler.java:180) 
    at org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler.publish(MqttPahoMessageHandler.java:189) 
    at org.springframework.integration.mqtt.outbound.AbstractMqttMessageHandler.handleMessageInternal(AbstractMqttMessageHandler.java:150) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.config.annotation.ServiceActivatorAnnotationPostProcessor$ReplyProducingMessageHandlerWrapper.handleRequestMessage(ServiceActivatorAnnotationPostProcessor.java:98) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    ... 19 more 
Caused by: Unable to connect to server (32103) - java.net.ConnectException: Connection refused 
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:79) 
    at org.eclipse.paho.client.mqttv3.internal.ClientComms$ConnectBG.run(ClientComms.java:590) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:589) 
    at org.eclipse.paho.client.mqttv3.internal.TCPNetworkModule.start(TCPNetworkModule.java:70) 
    ... 2 more 
2016-10-11 21:53:39.203 INFO 2102 --- [  Thread-1] s.c.a.AnnotationConfigApplicationContext : Closing org.spring[email protected]35a50a4c: startup date [Tue Oct 11 21:53:36 CEST 2016]; root of context hierarchy 
2016-10-11 21:53:39.207 INFO 2102 --- [  Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 0 
2016-10-11 21:53:39.209 INFO 2102 --- [  Thread-1] ProxyFactoryBean$MethodInvocationGateway : stopped mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] GatewayCompletableFutureProxyFactoryBean : stopped mqttJavaApplication$MyGateway 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : Removing {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 0 subscriber(s). 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : stopped _org.springframework.integration.errorLogger 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase -2147483648 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : Removing {message-handler:mqttJavaApplication.mqttOutbound.serviceActivator} as a subscriber to the 'mqttOutboundChannel' channel 
2016-10-11 21:53:39.210 INFO 2102 --- [  Thread-1] o.s.integration.channel.DirectChannel : Channel 'application.mqttOutboundChannel' has 0 subscriber(s). 
2016-10-11 21:53:39.211 INFO 2102 --- [  Thread-1] o.s.i.endpoint.EventDrivenConsumer  : stopped mqttJavaApplication.mqttOutbound.serviceActivator 
2016-10-11 21:53:39.211 INFO 2102 --- [  Thread-1] o.s.j.e.a.AnnotationMBeanExporter  : Unregistering JMX-exposed beans on shutdown 
2016-10-11 21:53:39.212 INFO 2102 --- [  Thread-1] o.s.s.c.ThreadPoolTaskScheduler   : Shutting down ExecutorService 'taskScheduler' 
------------------------------------------------------------------------ 
BUILD FAILURE 

Неисправный код в MqttJavaApplication.java файл копируется ниже:

@SpringBootApplication 
@IntegrationComponentScan 
public class MqttJavaApplication { 

    public static void main(String[] args) { 
     ConfigurableApplicationContext context = 
       new SpringApplicationBuilder(MqttJavaApplication.class) 
         .web(false) 
         .run(args); 
     MyGateway gateway = context.getBean(MyGateway.class); 
     gateway.sendToMqtt("foo"); // THROWS THE ABOVE EXCEPTION 
    } 

    @Bean 
    public MqttPahoClientFactory mqttClientFactory() { 
     DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); 
     factory.setServerURIs("tcp://localhost:1883"); 
     return factory; 
    } 

    @Bean 
    @ServiceActivator(inputChannel = "mqttOutboundChannel") 
    public MessageHandler mqttOutbound() { 
     MqttPahoMessageHandler messageHandler = 
         new MqttPahoMessageHandler("testClient", mqttClientFactory()); 
     messageHandler.setAsync(true); 
     messageHandler.setDefaultTopic("testTopic"); 
     return messageHandler; 
    } 

    @Bean 
    public MessageChannel mqttOutboundChannel() { 
     return new DirectChannel(); 
    } 

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") 
    public interface MyGateway { 
     void sendToMqtt(String data); 
    } 
} 

и это, кажется, клиент MQTT в любом случае ..

Как реализовать сервер MQTT (брокер) в качестве компонента для интеграции Spring Spring для Java, с чего начать?

ответ

6

Spring Integration не предоставляет брокеру; он предоставляет клиентам для отправки/получения сообщений.

+0

Gary, Spring Integration предоставляет HTTP-сервер, поэтому я предполагал, что создание MQTT-брокера тоже будет возможно? –

+1

Функция Spring Integration делает __not__ также предоставление HTTP-сервера; он может запускаться в веб-контейнере, таком как Tomcat, и таким образом предоставляет конечные точки HTTP. –

+0

Являются ли эти конечные точки HTTP «клиентами» или они могут быть «серверами»? (Возможно, это очень глупый вопрос) –

2

В верхней части сообщения об ошибке говорит:

java.net.ConnectException: Connection refused 

Это означает, что хост (локальный) вы установили в качестве брокера либо не работает брокер или имеет включен брандмауэр, препятствующая соединение.

Вряд ли брандмауэр блокирует подключения к локальному хосту, поэтому вам нужно проверить, действительно ли работает брокера.

Если вы не получили брокера, то вам необходимо установить один, самый простой, вероятно, Mosquitto доступны здесь:

https://mosquitto.org/download/

Другие брокеры доступны и список можно найти здесь:

https://github.com/mqtt/mqtt.github.io/wiki/servers

+1

'> Как реализовать сервер MQTT 'Spring Integration не обеспечивает функциональность брокера; это клиент. –