Попытка объединить две функции HiveMQ: общие подписки и постоянные сеансы.Общая подписка на HiveMQ с постоянным сеансом
Если вы создали очень простой поставщик сообщений. И очень простой потребитель. При запуске нескольких потребителей все потребители получают все сообщения.
После установки clearSession на «false» для потребителей, при запуске потребителя и перезагрузки потребителя потребитель также получает сообщения, когда он не соединяется. Отлично.
Теперь комбинируя его с функцией общей подписки. Когда используется только общая подписка, а clearSession - «true». При запуске нескольких потребителей сообщение принимается только одним потребителем. Он должен быть цикличным, и это тоже так, но как только вы останавливаете потребителя, сообщения перестают быть круглыми, но один из потребителей получает значительно больше сообщений, чем другие.
Если я снова включил постоянный сеанс, clearSession - это «ложь» и начните использование пользователей общей подписки, потребители начнут получать все сообщения снова, а сообщение просто доставляется одному клиенту.
В чем проблема? Это ошибка в HiveMQ? Может ли постоянная сессия и совместная подписка использоваться совместно? На самом деле это был облом.
UPDATE 15/2/2017 Как @fraschbi предложил я очистил все данные и снова повторно разделяемой подписку с постоянными потребителями сессии. Кажется, это сработало!
Странно, однако, что пропущенные сообщения принимаются только после того, как 1-й потребитель снова подключится. Все потребители имеют одинаковый код, они только начинаются с разных аргументов customerId. См. Код ниже. Моя тестовая последовательность:
- начато consumer1: все сообщения идут этим этим потребителем.
- начал использовать потребитель2: каждый потребитель получает каждое другое сообщение.
- начал потребитель3: каждый потребитель получает 1 из 3 сообщений.
- остановка потребительский1: сейчас потребительский2 и 3 принимаю всевозможные сообщение. (не знаю, почему я видел это неравномерное распределение вчера, но, возможно, как @fraschbi упоминается, потому что я повторно использовал clientId и не отписывал или правильно отключился)
- теперь останавливает consumer2: все сообщения теперь принимаются потребителем3.
- stopping consumer3: больше не получено сообщений.
- перезапуск потребителя3: он продолжает первое сообщение, отправленное производителем. Он не получает потерянные сообщения.
- restart consumer2: сообщения равномерно распределены снова.
- restart consumer1: ЭТО теперь получает все потерянные сообщения, а затем продолжает получать каждое 1 из 3 сообщений.
Так что мой новый вопрос: Почему только 1-й потребитель получает потерянные сообщения?
Примечание: трюк здесь по-прежнему не отменяет подписку при остановке клиента, потому что тогда параметр подписки/сохранения сохраняется!
Producer.scala
object Producer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.connect()
val theTopic = client.getTopic(topic)
var count = 0
sys.addShutdownHook {
println("Disconnecting client...")
client.disconnect()
println("Disconnected.")
}
while(true) {
val msg = new MqttMessage(s"Message: $count".getBytes())
theTopic.publish(msg)
println(s"Published: $msg")
Thread.sleep(1000)
count = count + 1
}
}
Consumer.scala
object Consumer extends App {
val topic = args(0)
val brokerUrl = "tcp://localhost:1883"
val clientId = args(1)
// val clientId = UUID.randomUUID().toString
val client = new MqttClient(brokerUrl, clientId)
client.setCallback(new MqttCallback {
override def deliveryComplete(token: IMqttDeliveryToken) =()
override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}")
override def connectionLost(cause: Throwable) = println("Connection lost")
})
println(s"Start $clientId consuming from topic: $topic")
val options = new MqttConnectOptions()
options.setCleanSession(false);
client.connect(options)
client.subscribe(topic)
sys.addShutdownHook {
println("Disconnecting client...")
// client.unsubscribe(topic)
client.disconnect()
println("Disconnected.")
}
while(true) {
}
}
Да, клиенты не отписаны. Просто остановился. Завтра снова проверит. –
Уточненный и обновленный вопрос –