Я пытаюсь понять, как работать с потоками в Java-клиенте, который подключается к HornetQ. Я не получаю определенную ошибку, но не понимаю, как я должен работать с потоками в первую очередь (относительно клиента HornetQ и, в частности, MessageHandler.onMessage()
- нити вообще не представляют для меня проблем).Обработка потоков в Java HornetQ client
В случае, если это необходимо: я использую 'org.hornetq:hornetq-server:2.4.7.Final'
для запуска сервера, встроенного в мое приложение. Я не собираюсь это делать. В моей ситуации это просто более удобно с точки зрения ops, чем запуск автономного процесса сервера.
То, что я сделал до сих пор:
создать встроенный сервер:
new EmbeddedHornetQ(), .setConfiguration()
создать сервер локатора:
HornetQClient.createServerLocator(false, new TransportConfiguration(InVMConnectorFactory.class.getName()))
- создать сессионный завод:
serverLocator.createSessionFactory()
Теперь мне кажется очевидным, что я могу создать sessi при использовании hornetqClientSessionFactory.createSession()
, создайте производителя и потребителя для этого сеанса и обработайте сообщения в пределах одного потока, используя .send()
и .receive()
.
Но я также обнаружил consumer.setMessageHandler()
, и это говорит мне, что я вообще не понял потоки в клиенте. Я попытался использовать его, но затем потребитель вызывает messageHandler.onMessage()
в двух потоках, отличных от того, который создал сеанс. Это похоже на мое впечатление от просмотра кода - клиент HornetQ использует пул потоков для отправки сообщений.
Это оставляет меня в замешательстве. Javadocs говорят, что сеанс "single-thread object"
, и код согласен - никакой очевидной синхронизации не происходит. Но с onMessage()
, вызываемым в нескольких потоках, message.acknowledge()
также вызывается в нескольких потоках, и тот просто делегирует сессию. Как это должно работать? Как будет выглядеть сценарий, в котором MessageHandler НЕ обращается к сеансу из нескольких потоков?
Идти дальше, Как я могу отправить последующие сообщения изнутри onMessage()? Я использую HornetQ для постоянной «рабочей» рабочей очереди, поэтому отправка последующих сообщений является типичным прецедентом для меня. Но опять же, в пределах onMessage()
, я не в порядке для доступа к сеансу.
Обратите внимание, что я был бы в порядке, держась подальше от MessageHandler и просто используя send()/receive()
таким образом, чтобы я мог контролировать потоки. Но я убежден, что я вообще не понимаю всю ситуацию, и это в сочетании с многопоточным процессом просто требует неприятностей.
Эти последующие сообщения: вы знаете заранее, куда вы собираетесь их отправлять? Являются ли последующие пункты назначения статичными или динамическими? – Tair
Адрес и очередь такие же, как и для исходного сообщения - одна очередь рабочих единиц. Не уверен, что вы подразумеваете под статикой/динамикой. Они динамичны в том смысле, что исходное рабочее устройство должно обрабатываться, чтобы знать, какие последующие сообщения должны быть сгенерированы. –
Я имею в виду, почему вы обеспокоены потоковой безопасностью сеанса, если вы можете передать продюсера обработчику сообщений (вместо этого, если сеанс). – Tair