2017-02-14 10 views
0

Попытка объединить две функции HiveMQ: общие подписки и постоянные сеансы.Общая подписка на HiveMQ с постоянным сеансом

Если вы создали очень простой поставщик сообщений. И очень простой потребитель. При запуске нескольких потребителей все потребители получают все сообщения.

После установки clearSession на «false» для потребителей, при запуске потребителя и перезагрузки потребителя потребитель также получает сообщения, когда он не соединяется. Отлично.

Теперь комбинируя его с функцией общей подписки. Когда используется только общая подписка, а clearSession - «true». При запуске нескольких потребителей сообщение принимается только одним потребителем. Он должен быть цикличным, и это тоже так, но как только вы останавливаете потребителя, сообщения перестают быть круглыми, но один из потребителей получает значительно больше сообщений, чем другие.

Если я снова включил постоянный сеанс, clearSession - это «ложь» и начните использование пользователей общей подписки, потребители начнут получать все сообщения снова, а сообщение просто доставляется одному клиенту.

В чем проблема? Это ошибка в HiveMQ? Может ли постоянная сессия и совместная подписка использоваться совместно? На самом деле это был облом.

UPDATE 15/2/2017 Как @fraschbi предложил я очистил все данные и снова повторно разделяемой подписку с постоянными потребителями сессии. Кажется, это сработало!

Странно, однако, что пропущенные сообщения принимаются только после того, как 1-й потребитель снова подключится. Все потребители имеют одинаковый код, они только начинаются с разных аргументов customerId. См. Код ниже. Моя тестовая последовательность:

  • начато consumer1: все сообщения идут этим этим потребителем.
  • начал использовать потребитель2: каждый потребитель получает каждое другое сообщение.
  • начал потребитель3: каждый потребитель получает 1 из 3 сообщений.
  • остановка потребительский1: сейчас потребительский2 и 3 принимаю всевозможные сообщение. (не знаю, почему я видел это неравномерное распределение вчера, но, возможно, как @fraschbi упоминается, потому что я повторно использовал clientId и не отписывал или правильно отключился)
  • теперь останавливает consumer2: все сообщения теперь принимаются потребителем3.
  • stopping consumer3: больше не получено сообщений.
  • перезапуск потребителя3: он продолжает первое сообщение, отправленное производителем. Он не получает потерянные сообщения.
  • restart consumer2: сообщения равномерно распределены снова.
  • restart consumer1: ЭТО теперь получает все потерянные сообщения, а затем продолжает получать каждое 1 из 3 сообщений.

Так что мой новый вопрос: Почему только 1-й потребитель получает потерянные сообщения?

Примечание: трюк здесь по-прежнему не отменяет подписку при остановке клиента, потому что тогда параметр подписки/сохранения сохраняется!

Producer.scala

object Producer extends App { 

    val topic = args(0) 
    val brokerUrl = "tcp://localhost:1883" 

    val clientId = UUID.randomUUID().toString 

    val client = new MqttClient(brokerUrl, clientId) 
    client.connect() 
    val theTopic = client.getTopic(topic) 

    var count = 0 

    sys.addShutdownHook { 
    println("Disconnecting client...") 
    client.disconnect() 
    println("Disconnected.") 
    } 

    while(true) { 
    val msg = new MqttMessage(s"Message: $count".getBytes()) 
    theTopic.publish(msg) 
    println(s"Published: $msg") 

    Thread.sleep(1000) 

    count = count + 1 
    } 
} 

Consumer.scala

object Consumer extends App { 

    val topic = args(0) 
    val brokerUrl = "tcp://localhost:1883" 

    val clientId = args(1) 
// val clientId = UUID.randomUUID().toString 

    val client = new MqttClient(brokerUrl, clientId) 
    client.setCallback(new MqttCallback { 
    override def deliveryComplete(token: IMqttDeliveryToken) =() 

    override def messageArrived(topic: String, message: MqttMessage) = println(s"received on topic '$topic': ${new String(message.getPayload)}") 

    override def connectionLost(cause: Throwable) = println("Connection lost") 
    }) 

    println(s"Start $clientId consuming from topic: $topic") 
    val options = new MqttConnectOptions() 
    options.setCleanSession(false); 

    client.connect(options) 
    client.subscribe(topic) 

    sys.addShutdownHook { 
    println("Disconnecting client...") 
// client.unsubscribe(topic) 
    client.disconnect() 
    println("Disconnected.") 
    } 


    while(true) { 

    } 

} 

ответ

1

Я попытаюсь ответить на два вопроса вы испытываете отдельно.

Это должно быть круговое движение, и это также имеет место, но как только вы останавливаете потребителя, сообщения больше не крутятся, а один из потребителей получает значительно больше сообщений, чем другие (-ы) ,

HiveMQ предпочитает онлайн-клиентов при распространении сообщений для общих подписчиков.

Если я снова включил постоянный сеанс, clearSession является «ложным» и запускает пользователей общей подписки, потребители начинают получать все сообщения снова, а сообщение просто доставляется одному клиенту.

В начале вашего вопроса вы сообщили, что вы связываете клиентов с cleanSession=false брокеру и подписываетесь на эту тему. (Звучит так, будто вы используете только одну тему.) Возможно ли, что вы не отменяете подписку на эти клиенты до повторного подключения с помощью cleanSession=false и общих подписки? В этом случае подписки с первого шага вашего сценария будут сохраняться для этих клиентов, и, естественно, каждый из них получит сообщения.

EDIT:

Так что мой новый вопрос: почему только первый потребитель получить потерянные сообщения?

Из Руководства пользователя HiveMQ:

Когда клиенты форума очередь заполнена, то сообщение для этого клиента не пострадает, но в очереди для следующего автономного клиента в общей абонентской группе.

Когда все клиенты находятся в автономном режиме, дистрибутив больше не крутится. Таким образом, сценарий, который вы описываете, находится в ожидаемом поведении.

Значение по умолчанию для очереди сообщений равно 1000. Таким образом, вы можете отправить более 1000 сообщений, в то время как клиенты находятся в автономном режиме, или уменьшить размер очереди сообщений.

... 
<persistence> 
    <queued-messages> 

     <max-queued-messages>50</max-queued-messages> 

    </queued-messages> 
    ... 
</persistence> 
... 

Добавьте это в config.xml для уменьшения размера очереди сообщений.

+0

Да, клиенты не отписаны. Просто остановился. Завтра снова проверит. –

+0

Уточненный и обновленный вопрос –

 Смежные вопросы

  • Нет связанных вопросов^_^