Я написал образец кода для добавления элементов в activemq, а затем их извлечение. Я успешно смог добавить около 1000 элементов, но при извлечении элементов код некогда застревает после извлечения около 50 - 200 элементов, даже если в очереди много элементов.activemq потребитель не возвращает данные, даже если очередь не пуста
Ниже приводится код, который я использовал для добавления элементов в очереди
@POST
@Path("/addelementtoqueue")
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
public void addElementToQeueue(@FormParam("count") int count) throws Exception {
IntStream.range(0, count)
.forEach(e -> {
try {
addElement(e);
}catch(Exception e1) {
throw new RuntimeException(e1);
}
});
}
private void addElement(int i) throws Exception {
Connection conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer prod = session.createProducer(queue);
prod.send(queue, session.createTextMessage("message "+ i), DeliveryMode.PERSISTENT, 4, 0);
prod.close();
session.close();
conn.close();
}
и это фрагмент кода, я использую для извлечения элементов из очереди
@POST
@Path("/removeelementfromqueue")
@Consumes(MediaType.APPLICATION_FORM_URLENCODED)
public void removeElementToQeueue(@FormParam("count") int count) throws Exception {
IntStream.range(0, count)
.forEach(e -> {
try {
extractElement();
}catch(Exception e1) {
throw new RuntimeException(e1);
}
});
}
private void extractElement() throws Exception {
Connection conn = GlobalConfiguration.getJMSConnectionFactory().createConnection();
conn.start();
Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue("walkin.testing");
MessageConsumer consumer = session.createConsumer(queue);
TextMessage msg = (TextMessage)consumer.receive();
System.out.println(msg.getText());
msg.acknowledge();
consumer.close();
session.close();
conn.close();
}
Я получаю фабрику соединений через resource.xml, фрагмент для него:
<resources>
<Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig = jdbcBroker:(tcp://0.0.0.0:61616)
ServerUrl = tcp://0.0.0.0:61616?jms.prefetchPolicy.queuePrefetch=0
</Resource>
<Resource id="MyJmsConnectionFactory" type="javax.jms.ConnectionFactory">
ResourceAdapter = MyJmsResourceAdapter
</Resource></resources>
Я использую activeMQ 5.13.1, с apache-tomee-plus-1.7.2 и Java 8, jdbc хранится как mysql. Я настроил activemq-jdbc-performance.xml в качестве файла конфигурации для apache activemq.
Я пробовал много исследований по этому вопросу, но я не могу определить основную причину этой проблемы. Было бы очень полезно, если кто-нибудь может предложить мне, что я делаю неправильно