2016-06-12 7 views
1

Я не могу заставить OpenNMS отправлять сообщения в конечную точку AMQP. У меня никогда не было этой работы, и это первый раз, когда я использовал OpenNMS и AMQP, так что это могло бы свести мою неопытность.OpenNMS v18 AMQP Сообщение Отправка сообщения

Я настроил RabbitMQ 3.5.7 и протестировал его согласно этому question. Он отлично работает при использовании внешнего клиента QPID 0.32, а также отлично работает при использовании python или perl. Определение прекрасных работ заключается в том, что связь установлена, и полезная нагрузка сообщения передается в обмен, а затем передается в бэкэнд-очередь. Сообщение можно просмотреть в графическом интерфейсе администратора RabbitMQ.

В OpenNMS я следил за инструкциями here Начал с EventForwarder, а затем попробовал AlarmNorthbounder и получил исключение NullPointerException.

настроить Karaf следующим образом настройка свойств с помощью этих операторов: -

opennms> config:edit org.opennms.features.amqp.alarmnorthbounder 
opennms> propset connectionUrl amqp://simon:[email protected]/test?brokerlist=\'localhost:5672\' 
opennms> propset destination "amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }" 
opennms> propset processorName default-alarm-northbounder-processor 
opennms> config:update 
opennms> config:list '(service.pid=org.opennms.features.amqp.alarmnorthbounder)' 
---------------------------------------------------------------- 
Pid:   org.opennms.features.amqp.alarmnorthbounder 
BundleLocation: mvn:org.opennms.features.amqp/org.opennms.features.amqp.alarm-northbounder/18.0.0 
Properties: 
    connectionUrl = amqp://simon:[email protected]/test?brokerlist='localhost:5672' 
    destination = amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} } 
    felix.fileinstall.filename = file:/usr/share/opennms/etc/org.opennms.features.amqp.alarmnorthbounder.cfg 
    processorName = default-alarm-northbounder-processor 
    service.pid = org.opennms.features.amqp.alarmnorthbounder 

Я получаю следующее сообщение об ошибке в журнале

Message History 
--------------------------------------------------------------------------------------------------------------------------------------- 
RouteId    ProcessorId   Processor                  Elapsed (ms) 
[forwardAlarm  ] [forwardAlarm  ] [seda://forwardAlarm               ] [   8] 
[forwardAlarm  ] [convertBodyTo3 ] [convertBodyTo[org.opennms.netmgt.alarmd.api.NorthboundAlarm]     ] [   0] 
[forwardAlarm  ] [log3    ] [log                   ] [   1] 
[forwardAlarm  ] [bean3    ] [bean[ref:dynamicallyTrackedProcessor]           ] [   0] 
[forwardAlarm  ] [to3    ] [amqp:onms3/Simon;{'create':'always','node':{'type':'topic'} }     ] [   7] 

Exchange 
--------------------------------------------------------------------------------------------------------------------------------------- 
Exchange[ 
     Id     ID-ubuntu-1604-35241-1465752556843-2-60 
     ExchangePattern  InOnly 
     Headers    {breadcrumbId=ID-ubuntu-1604-35241-1465752556843-2-58, CamelRedelivered=false, CamelRedeliveryCounter=0} 
     BodyType   String 
     Body    NorthboundAlarm[id=3, uei='uei.opennms.org/generic/traps/EnterpriseDefault', nodeId=1] 
] 

Stacktrace 
--------------------------------------------------------------------------------------------------------------------------------------- 
java.lang.NullPointerException 
     at org.apache.qpid.client.BasicMessageProducer_0_8.declareDestination(BasicMessageProducer_0_8.java:63)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.BasicMessageProducer.<init>(BasicMessageProducer.java:136)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.BasicMessageProducer_0_8.<init>(BasicMessageProducer_0_8.java:55)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:559)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 
     at org.apache.qpid.client.AMQSession_0_8.createMessageProducer(AMQSession_0_8.java:62)[212:org.apache.servicemix.bundles.qpid:0.28.0.1] 

Я ожидаю, что он должен быть в состоянии доставляйте сообщение в очередь, используя те же библиотеки, что и я, используя внешнюю часть.

Ввод DEBUG на для org.apache.qpid я получаю: -

2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]] 
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: FieldTable::writeToBuffer: Writing encoded length of 254... 
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.framing.FieldTable: {instance=[LONG_STRING: ubuntu-16041465757105204], product=[LONG_STRING: qpid], version=[LONG_STRING: 0.28], platform=[LONG_STRING: Java(TM) SE Runtime Environment, 1.8.0_45-b14, Oracle Corporation, amd64, Linux, 4.4.0-22-generic, unknown], qpid.client_process=[LONG_STRING: Qpid Java Client], qpid.client_pid=[INT: 1318]} 
2016-06-12 19:00:38,725 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionTuneBodyImpl: channelMax=0, frameMax=131072, heartbeat=60] 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.handler.ConnectionTuneMethodHandler: ConnectionTune frame received 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 3 name: CONNECTION_NOT_OPENED from old state AMQState: id = 2 name: CONNECTION_NOT_TUNED 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]] 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ConnectionOpenOkBodyImpl: knownHosts=null] 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: State changing to AMQState: id = 4 name: CONNECTION_OPEN from old state AMQState: id = 3 name: CONNECTION_NOT_OPENED 
2016-06-12 19:00:38,726 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.state.AMQStateManager: Notififying State change to 1 : [[email protected]] 
2016-06-12 19:00:38,726 INFO org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connection 44 now connected from /127.0.0.1:45388 to localhost/127.0.0.1:5672 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Are we connected:true 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnection: Connected with ProtocolHandler Version:0-91 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQConnectionDelegate_8_0: Write channel open frame for channel id 1 
2016-06-12 19:00:38,727 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Created session:[email protected] 
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [ChannelOpenOkBodyImpl: channelId=null] 
2016-06-12 19:00:38,728 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [IoReceiver - localhost/127.0.0.1:5672] org.apache.qpid.client.protocol.AMQProtocolHandler: (1404676419)Method frame received: [BasicQosOkBodyImpl: ] 
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQDestination: Based on onms3/Simon;{'create':'always','node':{'type':'topic'} } the selected destination syntax is ADDR 
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.AMQSession: Closing session: [email protected] 
2016-06-12 19:00:38,731 DEBUG org.apache.servicemix.bundles.qpid:0.28.0.1(212) [Camel (amqpAlarmNorthbounderCamelContext) thread #4 - seda://forwardAlarm] org.apache.qpid.client.protocol.AMQProtocolSession: closeSession called on protocol session for session 1 

Он закрывает сессию, прежде чем писать какую-либо информацию.

Когда я делать по существу то же самое, от внешнего qpid клиента - выполняется следующим образом: -

#!/bin/bash 

rm log.out 
java -Dqpid.amqp.version=0-91 -Dlog4j.debug -Dlog4j.configuration=file:./log4j.properties -cp "client/example/target/classes/:client/example/target/dependency/*:slf4j-1.7.21/slf4j-log4j12-1.7. 
21.jar:apache-log4j-1.2.17/log4j-1.2.17.jar" \ 
    org.apache.qpid.example.ListSender 

я получаю это: -

142 [main] DEBUG org.apache.qpid.client.AMQConnection - Are we connected:true 
142 [main] DEBUG org.apache.qpid.client.AMQConnection - Connected with ProtocolHandler Version:0-91 
146 [main] DEBUG org.apache.qpid.client.AMQConnectionDelegate_8_0 - Write channel open frame for channel id 1 
162 [main] DEBUG org.apache.qpid.client.AMQSession - Created session:[email protected] 
164 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelOpenOkBody] 
165 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [BasicQosOkBodyImpl: ] 
173 [main] DEBUG org.apache.qpid.client.AMQDestination - Based on onms3/Simon;{create: always, node:{type: topic } } the selected destination syntax is ADDR 
177 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 0... 
178 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
179 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ExchangeDeclareOkBodyImpl: ] 
180 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - MessageProducer [email protected] using publish mode : ASYNC_PUBLISH_ALL 
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content body frames to 'onms3'/'Simon'; { 
    'create': 'always', 
    'node': { 
    'type': 'topic' 
    } 
} 
190 [main] DEBUG org.apache.qpid.client.BasicMessageProducer_0_8 - Sending content header frame to 'onms3'/'Simon'; { 
    'create': 'always', 
    'node': { 
    'type': 'topic' 
    } 
} 
190 [main] DEBUG org.apache.qpid.framing.FieldTable - FieldTable::writeToBuffer: Writing encoded length of 90... 
191 [main] DEBUG org.apache.qpid.framing.FieldTable - {Id=[INT: 987654321], name=[LONG_STRING: WidgetSimon], price=[DOUBLE: 0.99], qpid.subject=[LONG_STRING: Simon], JMS_QPID_DESTTYPE=[INT: 2]} 
192 [main] DEBUG org.apache.qpid.client.AMQSession - Closing session: [email protected] 
192 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolSession - closeSession called on protocol session for session 1 
194 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ChannelCloseOkBody] 
194 [IoReceiver - localhost/127.0.0.1:5672] INFO org.apache.qpid.client.handler.ChannelCloseOkMethodHandler - Received channel-close-ok for channel-id 1 
195 [IoReceiver - localhost/127.0.0.1:5672] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - (333274164)Method frame received: [ConnectionCloseOkBody] 
196 [main] DEBUG org.apache.qpid.client.protocol.AMQProtocolHandler - Session closed called by client 

ListSender.java следующим образом: -

package org.apache.qpid.example; 

import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.HashMap; 
import java.util.List; 
import java.util.Map; 

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.Message; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 

import org.apache.qpid.client.AMQAnyDestination; 
import org.apache.qpid.client.AMQConnection; 

import org.apache.qpid.framing.AMQShortString; 
import org.apache.qpid.jms.ListMessage; 


public class ListSender { 

    public static void main(String[] args) throws Exception 
    { 
     Connection connection = 
      new AMQConnection("amqp://simon:[email protected]/test?brokerlist='tcp://localhost:5672'"); 
                AMQShortString a1 = new AMQShortString(""); 
                AMQShortString a2 = new AMQShortString(""); 
     AMQShortString[] bindvars = new AMQShortString[]{a1,a2}; 
     boolean is_durable = true; 
/*  
     Destination queue = new AMQAnyDestination(new AMQShortString("onms2"), 
                new AMQShortString("direct"), 
                new AMQShortString("Simon"), 
                true,   
                true,   
                new AMQShortString(""), 
                false,  
                bindvars); 
     */ 

     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     //Destination queue = new AMQAnyDestination("onms3/Simon"); 
     Destination queue = new AMQAnyDestination("onms3/Simon;{create: always, node:{type: topic } }"); 
     //Destination queue = new AMQAnyDestination("onms3/Simon;%7Bcreate%3A%20always%2C%20node%3A%7Btype%3A%20topic%20%7D%20%7D"); 
     //Destination queue = new AMQAnyDestination("amqp:OpenNMSExchange/Taylor; {create: always}"); 
     //Destination queue = new AMQAnyDestination("OpenNMSExchange; {create: always}"); 
     MessageProducer producer = session.createProducer(queue); 

ListMessage m = ((org.apache.qpid.jms.Session)session).createListMessage(); 
     m.setIntProperty("Id", 987654321); 
     m.setStringProperty("name", "WidgetSimon"); 
     m.setDoubleProperty("price", 0.99); 

     List<String> colors = new ArrayList<String>(); 
     colors.add("red"); 
     colors.add("green"); 
     colors.add("white"); 
     m.add(colors); 

     Map<String,Double> dimensions = new HashMap<String,Double>(); 
     dimensions.put("length",10.2); 
     dimensions.put("width",5.1); 
     dimensions.put("depth",2.0); 
     m.add(dimensions); 
     List<List<Integer>> parts = new ArrayList<List<Integer>>(); 
     parts.add(Arrays.asList(new Integer[] {1,2,5})); 
     parts.add(Arrays.asList(new Integer[] {8,2,5})); 
     m.add(parts); 

     Map<String,Object> specs = new HashMap<String,Object>(); 
     specs.put("colours", colors); 
     specs.put("dimensions", dimensions); 
     specs.put("parts", parts); 
     m.add(specs); 

     producer.send((Message)m); 
     System.out.println("Sent: " + m); 
     connection.close(); 
    } 

} 

Мое предположение состояло в том, что это была проблема подключения к серверу AMQP некоторого описания. Столкнувшись с рядом проблем во время устранения неполадок, если я столкнулся с проблемой из внешней банки, из журналов было ясно, в чем проблема. Это проблема с OpenNMS? У кого-нибудь это работает успешно? Любые идеи?

Приветствия

Simon

ответ

0

Следуя указаниям из списка OpenNMS я решил установить v6.0.3 QPID Broker вместо RabbitMQ и сообщения не течет никаких проблем в настоящее время.

+0

Извините, как я могу просмотреть журнал OpenNMS org.opennms.features.amqp.alarmnorthbounder? – sintetico82

 Смежные вопросы

  • Нет связанных вопросов^_^