Мне действительно нужен ответ на этот вопрос, поэтому я его редактирую.Java Qpid Proton - брокер ActiveMQ Невозможно назначить запрошенный адрес: bind
У меня есть брокер Apache ActiveMQ построен в моей связи с использованием этого кода
Broker.java
общественного класса Брокер {
private BrokerService broker;
public Broker(String connector) {
this.broker = new BrokerService();
this.broker.setUseJmx(true);
try {
this.broker.addConnector(connector);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
public void addConnector(String connector){
try {
this.broker.addConnector(connector);
} catch (Exception e) {
e.printStackTrace();
}
}
public void start() {
try {
this.broker.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public BrokerService getBroker() {
return broker;
}
public void setBroker(BrokerService broker) {
this.broker = broker;
}
}
Вот моя проблема
Я использую библиотеку Qpid Proton (см. Здесь: Qpid Proton). У меня есть один класс, чтобы прочитать данные, которые почти пример они дают вам на qpid Webiste
package messaging;
import java.io.IOException;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.FlowController;
import org.apache.qpid.proton.reactor.Handshaker;
public class AMQPSubscriber extends BaseHandler {
private String broker;
private String topic;
private String port;
public AMQPSubscriber(String broker, String port, String topic) {
this.broker = broker;
this.port = port;
this.topic = topic;
this.add(new Handshaker());
this.add(new FlowController());
}
@Override
public void onReactorInit(Event event) {
try {
event.getReactor().acceptor(broker, Integer.parseInt(port), new AMQPSubscriber(broker, port, topic));
} catch (NumberFormatException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onDelivery(Event event) {
System.out.println("---------Message Received--------");
Receiver recv = (Receiver) event.getLink();
Delivery delivery = recv.current();
if (delivery.isReadable() && !delivery.isPartial()) {
int size = delivery.pending();
byte[] buffer = new byte[size];
int read = recv.recv(buffer, 0, buffer.length);
recv.advance();
Message msg = Proton.message();
msg.decode(buffer, 0, read);
System.out.println("Subject : " + msg.getProperties().getSubject());
System.out.println("Text : " + ((AmqpValue) msg.getBody()).getValue());
}
}
}
Этот класс называется в главном:
public static void main (String[]args) throws IOException, TimeoutException, InterruptedException{
Broker broker = new Broker("amqp://" + host + ":" + AMQPport);
broker.start();
AMQPSubscriber receiv = new AMQPSubscriber(host, "5672", topic);
Reactor r;
try {
r = Proton.reactor(receiv);
r.run();
} catch (IOException e) {
e.printStackTrace();
}
}
Но когда я исполняю этот код, я получаю
INFO | Loaded the Bouncy Castle security provider.
INFO | Using Persistence Adapter: KahaDBPersistenceAdapter[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB]
INFO | JMX consoles can connect to service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
INFO | KahaDB is version 6
INFO | Recovering from the journal @1:61115
INFO | Recovery replayed 11 operations from the journal in 0.014 seconds.
INFO | PListStore:[C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\tmp_storage] started
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) is starting
INFO | Listening for connections at: amqp://127.0.0.1:5672
INFO | Connector amqp://127.0.0.1:5672 started
INFO | Apache ActiveMQ 5.13.3 (localhost, ID:DESKTOP-UK0JIC4-52783-1467025817901-0:1) started
INFO | For help or more information please see: http://activemq.apache.org
WARN | Store limit is 102400 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost\KahaDB only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb
WARN | Temporary Store limit is 51200 mb (current store usage is 0 mb). The data directory: C:\Users\alexi\Documents\workspace-sts-3.7.3.RELEASE\IOT\activemq-data\localhost only has 7792 mb of usable space. - resetting to maximum available disk space: 7792 mb
java.net.BindException: Address already in use: bind
at sun.nio.ch.Net.bind0(Native Method)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.Net.bind(Unknown Source)
at sun.nio.ch.ServerSocketChannelImpl.bind(Unknown Source)
at java.nio.channels.ServerSocketChannel.bind(Unknown Source)
at org.apache.qpid.proton.reactor.impl.AcceptorImpl.<init>(AcceptorImpl.java:102)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.acceptor(ReactorImpl.java:477)
at messaging.AMQPSubscriber.onReactorInit(AMQPSubscriber.java:33)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:209)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.engine.impl.EventImpl.delegate(EventImpl.java:129)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:114)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:307)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:275)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.run(ReactorImpl.java:343)
at messaging.Main.main(Main.java:98)
Этот брокер отлично работает, когда я использую MQTT и Paho, мне бы хотелось, чтобы он также работал с AMQP. Я знаю, что bind означает, что порт уже используется, но я не могу понять, как я мог бы прослушивать порт, где не отправляются данные.
Спасибо, что помогли мне.
Alexi
«192.168.100.47» должен быть вашим местным IP-адресом? Если да, то уверены ли вы? Также проверьте файл 'hosts', чтобы узнать, есть ли какая-либо запись для' localhost', кроме '127.0.0.1' – Bohemian
. Этот IP-адрес - это другой адрес компьютера, который является сетью. –