2017-02-08 9 views
0

Я пытаюсь эффективно создать поток, используя базу данных в качестве очереди. Причина этого в том, что ожидается, что другие процессы будут читать и отвечать на это сообщение и были разработаны таким образом. К сожалению, я не могу контролировать этот другой процесс и не могу заставить его реагировать на другую систему очередей.Mule Request-Reply Scope не читает ответ с запросом базы данных

Таким образом, поток будет работать следующим образом: HTTP-запрос на вставку записи в базу данных -> Отдельное приложение (за пределами мула) обследует эту таблицу базы данных для сообщений и отвечает другим сообщением в другую таблицу (этот шаг может возьмите> 5 секунд, чтобы ответить) -> прочитайте эту новую строку и ответьте на оригинальный запрос HTTP.

В этом дизайне область запроса-запроса всегда ожидает ответа, пока не появится ответ. (Я вручную установить его на 20 секунд, чтобы показать это довольно быстро)

Response истекло (20000ms) ждет идентификатор ответного сообщения «3e1a7750-ee13-11e6-ae40-0c9920524153» или это действие было прервано. Не удалось выполнить маршрутизацию через конечную точку: null. Полезная нагрузка сообщения имеет тип: Integer

Я явно что-то пропустил и не могу найти правильную документацию для этого из мула. Я надеюсь, что один из хороших пользователей этого сайта может исправить ошибку моих способов.

ниже поток и образец зрения

<flow name="mainFlow"> 
    <http:listener config-ref="HTTP_Listener_Configuration" path="hello" doc:name="HTTP"/> 
    <cxf:jaxws-service doc:name="CXF" configuration-ref="CXF_Configuration" serviceClass="kansas.MuleTestServiceImpl"/> 
    <request-reply doc:name="Request-Reply" timeout="20000"> 
     <db:insert config-ref="Oracle_Configuration" doc:name="Database"> 
      <db:parameterized-query><![CDATA[insert into tblRequest (id, correlationId, replyTo) values (#[message.id], #[message.correlationId], #[message.replyTo])]]></db:parameterized-query> 
     </db:insert> 
     <jms:inbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"> 
      <jms:transaction action="JOIN_IF_POSSIBLE"/> 
     </jms:inbound-endpoint> 
    </request-reply> 
    <logger message="payload is #[payload]" level="INFO" doc:name="Logger"/> 
</flow> 
<flow name="databasePoller"> 
    <poll doc:name="Poll"> 
     <fixed-frequency-scheduler frequency="5000"/> 
     <db:select config-ref="Oracle_Configuration" doc:name="Database"> 
      <db:parameterized-query><![CDATA[select id,correlationId,msgresponse,replyto from tblResponse]]></db:parameterized-query> 
     </db:select> 
    </poll> 
    <foreach collection="#[payload]" doc:name="For Each"> 
     <set-variable variableName="storedPayload" value="#[payload]" doc:name="storePayload"/> 
     <db:delete config-ref="Oracle_Configuration" doc:name="Database"> 
      <db:parameterized-query><![CDATA[delete from tblResponse where correlationId = #[storedPayload.correlationId]]]></db:parameterized-query> 
     </db:delete> 
     <set-payload value="#[flowVars.storedPayload]" doc:name="restorePayload"/> 
     <message-properties-transformer overwrite="true" doc:name="Message Properties"> 
      <add-message-property key="MULE_CORRELATION_ID" value="#[payload.ID]"/> 
      <add-message-property key="MULE_REPLYTO" value="#[payload.REPLYTO]"/> 
     </message-properties-transformer> 
     <set-payload value="#[payload.MSGRESPONSE]" doc:name="Set Payload"/> 
     <jms:outbound-endpoint queue="test.response" connector-ref="syncJms" doc:name="JMS"/> 
     <logger level="INFO" doc:name="Logger"/> 
    </foreach> 
</flow> 

enter image description here

ИСКЛЮЧЕНИЕ НИЖЕ


Сообщение: Ответ истекло (20000ms) ждет идентификатор ответного сообщения "b9a93d10-efa4-11e6-808b-0c9920524153" или это действие было прервано. Не удалось выполнить маршрутизацию через конечную точку: null. Полезная нагрузка сообщение имеет тип: Integer Тип: org.mule.api.routing.ResponseTimeoutException Код: MULE_ERROR - 2 JavaDoc: http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/routing/ResponseTimeoutException.html Полезная нагрузка: 1

Корень стека исключений трассировки: org.mule.api. routing.ResponseTimeoutException: время ожидания ответа (20000 мс), ожидающее идентификатора ответа «b9a93d10-efa4-11e6-808b-0c9920524153», или это действие было прервано. Не удалось выполнить маршрутизацию через конечную точку: null. Полезная нагрузка сообщение имеет тип: Integer в org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.receiveAsyncReply (AbstractAsyncRequestReplyRequester.java:283) в org.mule.routing.requestreply.AbstractAsyncRequestReplyRequester.process (AbstractAsyncRequestReplyRequester.java:89)

+0

Три вещи могут помочь. Сначала вы могли бы описать немного более подробное, чего бы вы хотели достичь. Во-вторых, добавьте полные записи в журнал, а третий вы можете попробовать реорганизовать. Переместите часть БД в субпоток, который вы вызываете через очередь VM. Добавьте исходящую очередь очереди VM (как запрос-образец обмена) в области ответа на запрос и вызовите новый поток с помощью соединителя VM и DB. Таким образом, Mule может реально реагировать, поскольку он может не найти ответ (временная очередь в jMS-сценарии) в вашей текущей конфигурации. – brazo

+0

Также эта информация может быть более просвещенной: https://www.ricston.com/blog/usecase-explaining-behaviour-requestreply-block/ – brazo

ответ

0

Я считаю, что я решил это. Я получил эту работу полностью с конечной точкой HTTP-запроса, а не с помощью опроса базы данных. При просмотре сообщения, кажется, что одна разница была через моего опросчика базы данных, mule добавляет 2 дополнительных свойства. MULE_CORRELATION_SEQUENCE и MULE_CORRELATION_GROUP_SIZE.

Удалив эти свойства перед отправкой сообщения в jms, разрешите области запроса-ответа правильно идентифицировать ответ в очереди jms.

<remove-property propertyName="MULE_CORRELATION_SEQUENCE" doc:name="Property"/> <remove-property propertyName="MULE_CORRELATION_GROUP_SIZE" doc:name="Property"/>

0

Я немного упростил поток, чтобы заставить его работать. Это может быть подходящее решение, если вы вместо опроса БД в отдельном потоке запускаете его с помощью очереди VM.

<flow name="mainFlow"> 
    <http:listener config-ref="HTTP_Listener_Configuration" 
     path="hello" doc:name="HTTP" /> 
    <dw:transform-message doc:name="Transform Message"> 
     <dw:set-payload> 
      <![CDATA[%dw 1.0 %output application/java 
      --- 
      { 
      name: "abc", 
      euro: 130, 
      usd: 123 
      }]]></dw:set-payload> 
    </dw:transform-message> 
    <request-reply doc:name="Request-Reply" timeout="20000"> 
     <vm:outbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"> 
      <message-properties-transformer scope="outbound"> <delete-message-property key="MULE_REPLYTO"/> </message-properties-transformer> 
      </vm:outbound-endpoint> 
     <vm:inbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/> 
    </request-reply> 
    <logger message="#[message.payloadAs(java.lang.String)]" level="INFO" doc:name="Logger"/> 
</flow> 
<flow name="soFlow"> 
    <vm:inbound-endpoint exchange-pattern="one-way" path="db" doc:name="VM"/> 
    <db:insert config-ref="MySQL_Configuration" doc:name="Database"> 
     <db:parameterized-query><![CDATA[insert into item (
      item_name, 
      price_euro, 
      price_usd) 
       values (#[payload.name], #[payload.euro], #[payload.usd])]]>    </db:parameterized-query> 
    </db:insert> 
    <vm:outbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/> 
</flow> 
<flow name="databasePoller"> 
    <vm:inbound-endpoint exchange-pattern="one-way" path="call-db-query" doc:name="VM"/> 
    <db:select config-ref="MySQL_Configuration" doc:name="Database"> 
     <db:parameterized-query><![CDATA[select item_name, 
     price_euro, 
     price_usd from item]]></db:parameterized-query> 
    </db:select> 
    <vm:outbound-endpoint exchange-pattern="one-way" path="test.response" doc:name="VM"/> 
</flow> 
+0

К сожалению, я не могу спросить программное обеспечение, которое делает запрос на переработку своей системы для этого , У них было требование об синхронном запросе HTTP, вероятно, из-за устаревших потребностей программного обеспечения. Я действительно хотел, чтобы у меня была эта способность. – Beta033