2013-04-19 7 views
1

Всякий раз, когда я пытаюсь опубликовать сообщение от ESB к теме сообщения брокера, я получаю эту ошибкуполучает ошибку при публикации сообщения в тему Message Broker от Proxy ESB в

[2013-04-19 14:51:45,930] ERROR - AMQConnection Throwable Received but no listener set: org.wso2.andes.client.AMQNoRoute 
Exception: Error: No Route for message [error code 312: no route] 

Моего прокси-код

<proxy name="SendMessageProxy" transports="http" startOnLoad="true">  
<target>  
<endpoint>   
<address uri="jms:/myTopic?&amp;transport.jms.DestinationType=topic"/>  
</endpoint>  
<inSequence>   
<log level="custom">    
<property name="STATE" value="message is sent to queue"/>   
</log>   
<property name="OUT_ONLY" value="true"/>   
<property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>  
</inSequence>  
<outSequence/>  
</target> 
</proxy>` 

Конфигурация моей конфигурации jndi и конфигурации оси 2. Конфигурация my MB работает на порту 9444, а Publisher_esb работает на 9443, а абонентский esb работает на порту 9446. Когда я сохраняю свой абонент активным, а затем, если я отправляю сообщение от своего издатель меня ssage отражается на субцибер.

код, чтобы получить сообщение от абонента

package xml.parser; 

import org.w3c.dom.*; 
import javax.xml.xpath.*; 
import javax.xml.namespace.NamespaceContext; 
import javax.xml.parsers.*; 

import java.io.IOException; 
import java.util.Enumeration; 
import java.util.Iterator; 

import org.xml.sax.InputSource; 
import org.xml.sax.SAXException; 
import javax.jms.*; 
import javax.naming.Context; 
import javax.naming.InitialContext; 
import javax.naming.NamingException; 
import java.util.Properties; 

public class Parser { 

    public static final String QPID_ICF = "org.wso2.andes.jndi.PropertiesFileInitialContextFactory"; 
    private static final String CF_NAME_PREFIX = "connectionfactory."; 
    private static final String CF_NAME = "qpidConnectionfactory"; 
    String userName = "admin"; 
    String password = "admin"; 
    private static String CARBON_CLIENT_ID = "carbon"; 
    private static String CARBON_VIRTUAL_HOST_NAME = "carbon"; 
    private static String CARBON_DEFAULT_HOSTNAME = "localhost"; 
    private static String CARBON_BROKER_PORT = "5673"; 
    String topicName = "myTopic"; 

    public static void main(String[] args) throws NamingException, 
      JMSException, XPathExpressionException, 
      ParserConfigurationException, SAXException, IOException { 

     Parser queueReceiver = new Parser(); 
     String message = queueReceiver.subscribe(); 

    } 


    public String subscribe() throws NamingException, JMSException { 

     String messageContent = ""; 
    Properties properties = new Properties(); 
    properties.put(Context.INITIAL_CONTEXT_FACTORY, QPID_ICF); 
    properties.put(CF_NAME_PREFIX + CF_NAME, 
      getTCPConnectionURL(userName, password)); 
    properties.put("topic." + topicName, topicName); 
    System.out.println("getTCPConnectionURL(userName,password) = " 
      + getTCPConnectionURL(userName, password)); 
    InitialContext ctx = new InitialContext(properties); 
    // Lookup connection factory 
    TopicConnectionFactory connFactory = (TopicConnectionFactory) ctx 
      .lookup(CF_NAME); 
    TopicConnection topicConnection = connFactory.createTopicConnection(); 
    topicConnection.start(); 
    TopicSession topicSession = topicConnection.createTopicSession(false, 
      QueueSession.AUTO_ACKNOWLEDGE); 
    // Send message 
    //Topic topic = topicSession.createTopic(topicName); 
    Topic topic = (Topic) ctx.lookup(topicName); 
    javax.jms.TopicSubscriber topicSubscriber = topicSession 
      .createDurableSubscriber(topic,"admin"); 
    Message message = topicSubscriber.receive(); 
    if (message instanceof TextMessage) { 
     TextMessage textMessage = (TextMessage) message; 
     System.out.println("textMessage.getText() = " 
       + textMessage.getText()); 
     messageContent = textMessage.getText(); 
    } 
    topicSession.close(); 
    topicConnection.close(); 

    return messageContent;  } 

    public String getTCPConnectionURL(String username, String password) { 
     return new StringBuffer().append("amqp://").append(username) 
       .append(":").append(password).append("@") 
       .append(CARBON_CLIENT_ID).append("/") 
       .append(CARBON_VIRTUAL_HOST_NAME).append("?brokerlist='tcp://") 
       .append(CARBON_DEFAULT_HOSTNAME).append(":") 
       .append(CARBON_BROKER_PORT).append("'").toString(); 

    } 

} 

Whne я побежал подписчика впервые он дал мне результат, но после того, что он дает исключение, как:

[2013-04-19 17:24:26,947] ERROR {org.wso2.andes.transport.network.mina.MinaNetworkHandler} - Exception caught by Mina 
java.io.IOException: An existing connection was forcibly closed by the remote host 
     at sun.nio.ch.SocketDispatcher.read0(Native Method) 
     at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43) 
     at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218) 
     at sun.nio.ch.IOUtil.read(IOUtil.java:191) 
     at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359) 
     at org.apache.mina.transport.socket.nio.SocketIoProcessor.read(SocketIoProcessor.java:218) 
     at org.apache.mina.transport.socket.nio.SocketIoProcessor.process(SocketIoProcessor.java:198) 
     at org.apache.mina.transport.socket.nio.SocketIoProcessor.access$400(SocketIoProcessor.java:45) 
     at org.apache.mina.transport.socket.nio.SocketIoProcessor$Worker.run(SocketIoProcessor.java:485) 
     at org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:51) 
     at java.lang.Thread.run(Thread.java:722) 
[2013-04-19 17:24:26,957] ERROR {org.wso2.andes.server.protocol.AMQProtocolEngine} - IOException caught in/127.0.0.1:16 
513(admin), session closed implictly: java.io.IOException: An existing connection was forcibly closed by the remote host 

[2013-04-19 17:33:40,283] INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} - Closing channel due to: Cannot sub 
scribe to queue carbon:admin as it already has an existing exclusive consumer 
[2013-04-19 17:33:40,283] INFO {org.wso2.andes.server.protocol.AMQProtocolEngine} - Channel[1] awaiting closure - proc 
essing close-ok 
[2013-04-19 17:33:40,283] INFO {org.wso2.andes.server.handler.ChannelCloseOkHandler} - Received channel-close-ok for c 
hannel-id 1 
[2013-04-19 17:40:48,867] INFO {org.wso2.andes.server.queue.SimpleAMQQueue} - Auto-deleteing queue:tmp_127_0_0_1_16587 
_1 

Can» t я отправляю сообщения на эту тему, не активируя свой абонент. Как я могу сделать свое сообщение долговечным или постоянным? И еще один вопрос, который у меня есть: Как я могу получить queueName, связанную с темой, которую я создал aor, может ли я создать очередь для моей темы myTopic? Ждем ваших ответов. Заранее спасибо

ответ

0

У прочных подписчиков в MB 2.0.1 есть некоторые известные проблемы, и это может быть из-за этого. Попробуйте с MB 2.1.0 - Alpha version от here и посмотрите, разрешено ли здесь. Эти проблемы будут исправлены в MB 2.1.0, который будет выпущен через несколько недель.