Я не могу заставить OpenNMS отправлять сообщения в конечную точку AMQP. У меня никогда не было этой работы, и это первый раз, когда я использовал OpenNMS и AMQP, так что это могло бы свести мою неопытность.OpenNMS v18 AMQP Сообщение Отправка сообщения
Я настроил RabbitMQ 3.5.7 и протестировал его согласно этому question. Он отлично работает при использовании внешнего клиента QPID 0.32, а также отлично работает при использовании python или perl. Определение прекрасных работ заключается в том, что связь установлена, и полезная нагрузка сообщения передается в обмен, а затем передается в бэкэнд-очередь. Сообщение можно просмотреть в графическом интерфейсе администратора RabbitMQ.
В OpenNMS я следил за инструкциями here Начал с EventForwarder, а затем попробовал AlarmNorthbounder и получил исключение NullPointerException.
настроить Karaf следующим образом настройка свойств с помощью этих операторов: -
opennms> config:edit org.opennms.features.amqp.alarmnorthbounder
opennms> propset connectionUrl amqp://simon:[email protected]/test?brokerlist=\'localhost:5672\'
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }"
opennms> propset processorName default-alarm-northbounder-processor
opennms> config:update
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)'
----------------------------------------------------------------
Pid: org.opennms.features.amqp.alarmnorthbounder
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0
Properties:
connectionUrl = amqp://simon:[email protected]/test?brokerlist='localhost:5672'
destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }
felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg
processorName = default-alarm-northbounder-processor
service.pid = org.opennms.features.amqp.alarmnorthbounder
Я получаю следующее сообщение об ошибке в журнале
Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[forwardAlarm ] [forwardAlarm ] [seda://forwardAlarm ] [ 8]
[forwardAlarm ] [convertBodyTo3 ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm] ] [ 0]
[forwardAlarm ] [log3 ] [log ] [ 1]
[forwardAlarm ] [bean3 ] [bean[ref:dynamicallyTrackedProcessor] ] [ 0]
[forwardAlarm ] [to3 ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} } ] [ 7]
Exchange
---------------------------------------------------------------------------------------------------------------------------------------
Exchange[
Id ID-ubuntu-1604-35241-1465752556843-2-60
ExchangePattern InOnly
Headers {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0}
BodyType String
Body NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1]
]
Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
java.lang.NullPointerException
at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1]
Я ожидаю, что он должен быть в состоянии доставляйте сообщение в очередь, используя те же библиотеки, что и я, используя внешнюю часть.
Ввод DEBUG на для org.apache.qpid я получаю: -
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]]
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254...
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]}
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null]
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]]
2016-06-12 19:00:38,726 INFO org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:[email protected]
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null]
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ]
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: [email protected]
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1
Он закрывает сессию, прежде чем писать какую-либо информацию.
Когда я делать по существу то же самое, от внешнего qpid клиента - выполняется следующим образом: -
#!/bin/bash
rm log.out
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7.
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \
org.apache.qpid.example.ListSender
я получаю это: -
142 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true
142 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91
146 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1
162 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected]
164 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelOpenOkBody]
165 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [BasicQosOkBodyImpl: ]
173 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR
177 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 0...
178 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ]
180 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to 'onms3'/'Simon'; {
'create': 'always',
'node': {
'type': 'topic'
}
}
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to 'onms3'/'Simon'; {
'create': 'always',
'node': {
'type': 'topic'
}
}
190 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 90...
191 [main] DEBUG org.apache.qpid.framing.FieldTable - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]}
192 [main] DEBUG org.apache.qpid.client.AMQSession - Closing session: [email protected]
192 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession - closeSession called on protocol session for session 1
194 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelCloseOkBody]
194 [IoReceiver - localhost/127.0.0.1:5672] INFO org.apache.qpid.client.handler.ChannelCloseOkMethodHandler - Received channel-close-ok for channel-id 1
195 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ConnectionCloseOkBody]
196 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - Session closed called by client
ListSender.java следующим образом: -
package org.apache.qpid.example;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.client.AMQAnyDestination;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.ListMessage;
public class ListSender {
public static void main(String[] args) throws Exception
{
Connection connection =
new AMQConnection("amqp://simon:[email protected]/test?brokerlist='tcp://localhost:5672'");
AMQShortString a1 = new AMQShortString("");
AMQShortString a2 = new AMQShortString("");
AMQShortString[] bindvars = new AMQShortString[]{a1,a2};
boolean is_durable = true;
/*
Destination queue = new AMQAnyDestination(new AMQShortString("onms2"),
new AMQShortString("direct"),
new AMQShortString("Simon"),
true,
true,
new AMQShortString(""),
false,
bindvars);
*/
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Destination queue = new AMQAnyDestination("onms3/Simon");
Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }");
//Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D");
//Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}");
//Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}");
MessageProducer producer = session.createProducer(queue);
ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage();
m.setIntProperty("Id", 987654321);
m.setStringProperty("name", "WidgetSimon");
m.setDoubleProperty("price", 0.99);
List<String> colors = new ArrayList<String>();
colors.add("red");
colors.add("green");
colors.add("white");
m.add(colors);
Map<String,Double> dimensions = new HashMap<String,Double>();
dimensions.put("length",10.2);
dimensions.put("width",5.1);
dimensions.put("depth",2.0);
m.add(dimensions);
List<List<Integer>> parts = new ArrayList<List<Integer>>();
parts.add(Arrays.asList(new Integer[] {1,2,5}));
parts.add(Arrays.asList(new Integer[] {8,2,5}));
m.add(parts);
Map<String,Object> specs = new HashMap<String,Object>();
specs.put("colours", colors);
specs.put("dimensions", dimensions);
specs.put("parts", parts);
m.add(specs);
producer.send((Message)m);
System.out.println("Sent: " + m);
connection.close();
}
}
Мое предположение состояло в том, что это была проблема подключения к серверу AMQP некоторого описания. Столкнувшись с рядом проблем во время устранения неполадок, если я столкнулся с проблемой из внешней банки, из журналов было ясно, в чем проблема. Это проблема с OpenNMS? У кого-нибудь это работает успешно? Любые идеи?
Приветствия
Simon
Извините, как я могу просмотреть журнал OpenNMS org.opennms.features.amqp.alarmnorthbounder? – sintetico82