2015-06-25 11 views
1

Я тестировал MQTT для проекта. Я также могу получать сообщения по теме, на которую мой клиент подписался, когда клиент подключен. Я установил QoS в 1 и для параметра cleanSession установлено значение false. Но я не могу получать сообщения, которые были отправлены на тему подписки, когда мой клиент снова подключится. В моем приложении почти вся работа выполняется службой-помощником.Paho MQTT cleanSession установлен в false, но не получает сообщений

Вот мой код

AndroidManifest.xml

<?xml version="1.0" encoding="utf-8"?> 

<uses-permission android:name="android.permission.WAKE_LOCK" /> 
<uses-permission android:name="android.permission.INTERNET" /> 
<uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE" /> 
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" /> 

<application 
    android:allowBackup="true" 
    android:icon="@mipmap/ic_launcher" 
    android:label="@string/app_name" 
    android:theme="@style/AppTheme" > 
    <activity 
     android:name=".MainActivity" 
     android:label="@string/app_name" 
     android:screenOrientation="portrait" > 
     <intent-filter> 
      <action android:name="android.intent.action.MAIN" /> 

      <category android:name="android.intent.category.LAUNCHER" /> 
     </intent-filter> 
    </activity> 

    <service 
     android:name=".MqttHelperService" 
     android:enabled="true" 
     android:exported="true" /> 

    <!-- MqttService --> 
    <service android:name="org.eclipse.paho.android.service.MqttService" /> 
</application> 

MainActivity.java

package com.prateek.mqtttest; 

import android.app.Activity; 
import android.content.Intent; 
import android.os.Bundle; 

public class MainActivity extends Activity { 
    @Override 
    protected void onCreate(Bundle savedInstanceState) { 
     super.onCreate(savedInstanceState); 
     setContentView(R.layout.activity_main); 
     startService(new Intent(getBaseContext(), MqttHelperService.class)); 
    } 
} 

MqttHelperService.java

package com.prateek.mqtttest; 

import android.app.Service; 
import android.content.Intent; 
import android.os.Binder; 
import android.os.IBinder; 
import android.widget.Toast; 

import org.eclipse.paho.android.service.MqttAndroidClient; 
import org.eclipse.paho.client.mqttv3.IMqttActionListener; 
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; 
import org.eclipse.paho.client.mqttv3.IMqttToken; 
import org.eclipse.paho.client.mqttv3.MqttCallback; 
import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 
import org.eclipse.paho.client.mqttv3.MqttException; 
import org.eclipse.paho.client.mqttv3.MqttMessage; 

public class MqttHelperService extends Service implements MqttCallback { 

    private static final String MQTT_URI = "tcp://broker.mqttdashboard.com:1883"; 
    private static final String CLIENT_ID = "prateek"; 
    private static final String MQTT_TOPIC = "mqttmessenger"; 
    private static final int QOS = 1; 
    private MqttAndroidClient client; 

    public MqttHelperService() { 
    } 

    @Override 
    public int onStartCommand(Intent intent, int flags, int startId) { 
     Toast.makeText(this, "MQTT Helper Service Started", Toast.LENGTH_SHORT).show(); 
     new Thread(new Runnable() { 
      @Override 
      public void run() { 
       connect(); 
      } 
     }, "MqttHelperService").start(); 
     return START_STICKY; 
    } 

    public class MqttHelperBinder extends Binder { 
     public MqttHelperService getService(){ 
      return MqttHelperService.this; 
     } 
    } 

    public void connect() { 
     client = new MqttAndroidClient(this, MQTT_URI, CLIENT_ID); 
     client.setCallback(this); 

     try { 
      MqttConnectOptions options = new MqttConnectOptions(); 
      options.setCleanSession(false); 
      client.connect(options, new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken iMqttToken) { 
        Toast.makeText(getBaseContext(), "connected to MQTT broker", Toast.LENGTH_SHORT).show(); 
        subscribe(); 
       } 

       @Override 
       public void onFailure(IMqttToken iMqttToken, Throwable throwable) { 
        Toast.makeText(getBaseContext(), "failed to connect: " + throwable.getMessage(), Toast.LENGTH_SHORT).show(); 
       } 
      }); 
     } catch (MqttException e) { 
      Toast.makeText(this, "could not connect to MQTT broker at " + MQTT_URI, Toast.LENGTH_SHORT).show(); 
     } 
    } 

    public void subscribe() { 
     try { 
      IMqttToken token = client.subscribe(MQTT_TOPIC, QOS, null, new IMqttActionListener() { 
       @Override 
       public void onSuccess(IMqttToken iMqttToken) { 
        Toast.makeText(getBaseContext(), "subscription successful", Toast.LENGTH_SHORT).show(); 
       } 

       @Override 
       public void onFailure(IMqttToken iMqttToken, Throwable throwable) { 
        Toast.makeText(getBaseContext(), "subscription failed: " + throwable, Toast.LENGTH_SHORT).show(); 
       } 
      }); 

     } catch (MqttException e) { 
      Toast.makeText(this, "could not subscribe", Toast.LENGTH_SHORT).show(); 
     } 
    } 

    @Override 
    public IBinder onBind(Intent intent) { 
     // TODO: Return the communication channel to the service. 
     throw new UnsupportedOperationException("Not yet implemented"); 
    } 

    @Override 
    public void connectionLost(Throwable throwable) { 
     Toast.makeText(this, "connection lost", Toast.LENGTH_SHORT).show(); 
    } 

    @Override 
    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception { 
     Toast.makeText(this, "message received on topic " + s, Toast.LENGTH_SHORT).show(); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) { 
    } 

    @Override 
    public void onDestroy() { 
     super.onDestroy(); 
     Toast.makeText(this, "Service Destroyed", Toast.LENGTH_SHORT).show(); 
    } 


} 

Я даже проверил эту ссылку With the clear session flag set to FALSE, I am missing the published values но не смог найти ошибку в своем коде

ответ

4

У меня была такая же проблема в последнее время. Я думаю, что решение очень простое, но я потратил много часов, чтобы понять это.

Эта линия была 'плохой':

client.connect(mqttOptions, mqqtActionListener); 

'правильное' является:

client.connect(mqttOptions, null, mqqtActionListener); 

Если вы вызываете подключить метод с 2 параметрами, вы используете этот конструктор:

public IMqttToken connect(Object userContext, IMqttActionListener callback) throws MqttException 

Вместо правого:

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback) throws MqttException 

Я надеюсь, что это ваша проблема.

+0

, пожалуйста, помогите мне. http://stackoverflow.com/questions/36056381/the-listener-of-publish-method-does-not-work-asynchronously-on-mqtt-in-eclipse-p –

+0

Большое спасибо, я провел очень много часов, глядя вокруг, чтение, перезапись моего класса обслуживания, изменение брокеры, в конце концов, это была проблема. – user3564573

+0

Я счастлив, что смогу помочь. :) – wyzard

1

Скорее всего, проблема в том, что сообщения публикуются были QoS = 0. Как подписка, так и публикация должны быть QoS> 0, чтобы быть поставленными в очередь для длительного клиента.

Вы подтвердили, что сообщения публикуются с использованием QoS 1, поэтому следующая задача - проверить с помощью известных рабочих инструментов, чтобы устранить возможные проблемы с вашим кодом. Я пробовал:

mosquitto_sub -i prateek -t mqttmessenger -h broker.mqttdashboard.com -v -d -c -q 1 

И проверить, что я мог бы получать сообщения в то время это было связано:

mosquitto_pub -q 1 -t mqttmessenger -m hello2 -h broker.mqttdashboard.com 

Затем я отключил клиента mosquitto_sub и опубликовал еще одно сообщение с той же командой mosquitto_pub. Повторное подключение mosquitto_sub с той же командой не создало никакого сообщения, как вы видите. Повторяя процедуру, но используя test.mosquitto.org в качестве брокера, поведение будет таким, как ожидалось. Похоже, что broker.mqttdashboard.com не настроен для долговечных клиентов.

+0

Сообщения также публикуются с QoS = 1 –