2016-06-06 10 views
9

Я пытаюсь понять, как работать с потоками в Java-клиенте, который подключается к HornetQ. Я не получаю определенную ошибку, но не понимаю, как я должен работать с потоками в первую очередь (относительно клиента HornetQ и, в частности, MessageHandler.onMessage() - нити вообще не представляют для меня проблем).Обработка потоков в Java HornetQ client

В случае, если это необходимо: я использую 'org.hornetq:hornetq-server:2.4.7.Final' для запуска сервера, встроенного в мое приложение. Я не собираюсь это делать. В моей ситуации это просто более удобно с точки зрения ops, чем запуск автономного процесса сервера.

То, что я сделал до сих пор:

  1. создать встроенный сервер: new EmbeddedHornetQ(), .setConfiguration()

  2. создать сервер локатора: HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))

  3. создать сессионный завод: serverLocator.createSessionFactory()

Теперь мне кажется очевидным, что я могу создать sessi при использовании hornetqClientSessionFactory.createSession(), создайте производителя и потребителя для этого сеанса и обработайте сообщения в пределах одного потока, используя .send() и .receive().

Но я также обнаружил consumer.setMessageHandler(), и это говорит мне, что я вообще не понял потоки в клиенте. Я попытался использовать его, но затем потребитель вызывает messageHandler.onMessage() в двух потоках, отличных от того, который создал сеанс. Это похоже на мое впечатление от просмотра кода - клиент HornetQ использует пул потоков для отправки сообщений.

Это оставляет меня в замешательстве. Javadocs говорят, что сеанс "single-thread object", и код согласен - никакой очевидной синхронизации не происходит. Но с onMessage(), вызываемым в нескольких потоках, message.acknowledge() также вызывается в нескольких потоках, и тот просто делегирует сессию. Как это должно работать? Как будет выглядеть сценарий, в котором MessageHandler НЕ обращается к сеансу из нескольких потоков?

Идти дальше, Как я могу отправить последующие сообщения изнутри onMessage()? Я использую HornetQ для постоянной «рабочей» рабочей очереди, поэтому отправка последующих сообщений является типичным прецедентом для меня. Но опять же, в пределах onMessage(), я не в порядке для доступа к сеансу.

Обратите внимание, что я был бы в порядке, держась подальше от MessageHandler и просто используя send()/receive() таким образом, чтобы я мог контролировать потоки. Но я убежден, что я вообще не понимаю всю ситуацию, и это в сочетании с многопоточным процессом просто требует неприятностей.

+0

Эти последующие сообщения: вы знаете заранее, куда вы собираетесь их отправлять? Являются ли последующие пункты назначения статичными или динамическими? – Tair

+0

Адрес и очередь такие же, как и для исходного сообщения - одна очередь рабочих единиц. Не уверен, что вы подразумеваете под статикой/динамикой. Они динамичны в том смысле, что исходное рабочее устройство должно обрабатываться, чтобы знать, какие последующие сообщения должны быть сгенерированы. –

+0

Я имею в виду, почему вы обеспокоены потоковой безопасностью сеанса, если вы можете передать продюсера обработчику сообщений (вместо этого, если сеанс). – Tair

ответ

1

Я могу ответить на часть вашего вопроса, хотя, надеюсь, вы уже исправили проблему.

Сформировать HornetQ documentation on ClientConsumer (Курсив мой):

ClientConsumer получает сообщения из очереди HornetQ.
Сообщения могут быть использованы синхронно с использованием методов receive(), которые будут блокироваться до тех пор, пока сообщение не будет получено (или истечет время ожидания) или асинхронно, установив MessageHandler.
Эти 2 типа потребления являются эксклюзивными: ClientConsumer с набором MessageHandler будет вызывать HornetQException, если вызываются методы receive().

Так у вас есть два варианта на обработку приема сообщений:

  1. Синхронизировать прием самостоятельно
    • Do не обеспечивают MessageListener к HornetQ
    • В вашем собственном cunsumer тему , введите .receive() или .receive(long itmeout) на досуге
    • Получить (дополнительно) ClientMessage объекта, возвращаемого вызовом
      • Pro: Использованием Session вы, надеюсь, нести в Потребителя вы можете направить сообщение по своему усмотрению
      • Con: Всего этого сообщения обработка будет последовательным
  2. Делегат синхронизации потоков в HornetQ
    • Do не вызывать .receive() на потребителя
    • Обеспечить реализацию MessageListener из onMessage(ClientMessage)
      • Pro: Все обработки сообщений будет одновременно и быстро, без проблем
      • Con: Я не думаю, что можно получить Session от этого объекта, так как он не отображается интерфейсом ,
    • Непроверено обходной путь: В моем приложении (которое в-VM, как ваша), я разоблачил, лежащий в основе, поточно-QueueConnection как статическая переменная доступна приложения шириной. Из вашего MessageListener вы можете вызывать QueueSession jmsSession = jmsConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); на нем, чтобы получить новый сеанс и отправить свои сообщения от него ... Вероятно, это alright, насколько я могу, see, потому что объект Session не был повторно создан. Я также сделал это, потому что у Сеансов была тенденция к устареванию.

Я не думаю, что вы должны хотеть настолько, чтобы контролировать вашего исполнения Сообщение потоков , особенно временные потоки, просто пересылают сообщения. У HornetQ есть встроенные пулы потоков, как вы догадались, и эффективно использует эти объекты.

Кроме того, как вы знаете, вам не нужно находиться в одном потоке для доступа к объекту (например, к очереди), поэтому не имеет значения, доступен ли доступ к очереди через несколько потоков или даже через несколько сеансов.Вам нужно только убедиться, что сеанс доступен только одним потоком, и это по дизайну с MessageListener.

 Смежные вопросы

  • Нет связанных вопросов^_^