2016-07-17 4 views
1

У меня есть AndroidService для прослушивания сообщений MQTT. Ниже приведен код. По какой-то причине служба может подключаться и подписываться на канал, но не может читать сообщения. messageArrived никогда не называется.Android MQTT с использованием Paho Client. Не удалось получить сообщения

public class FollowService extends Service implements MqttCallback{ 

    private final IBinder localBinder = new FollowServiceBinder(); 
    private final String TAG = "Service"; 
    private MqttClient mqClient; 

    public class FollowServiceBinder extends Binder { 
     public FollowService getService() { 
      return FollowService.this; 
     } 
    } 

    public FollowService() { 
    } 

    @Override 
    public IBinder onBind(Intent intent) { 
     return localBinder; 
    } 

    @Override 
    public void onCreate() { 
     super.onCreate(); 
     try { 
      mqClient = new MqttClient("tcp://192.168.1.46:1883", "sadfsfi", new MemoryPersistence()); 
      mqClient.connect(); 
      Log.i(TAG, "Connected to client"); 
     } 
     catch(MqttException me){ 
      Log.e(TAG, "MqttClient Exception Occured in on create!!!", me); 
     } 
    } 

    @Keep 
    public void beginFollowing(){ 
     try { 
      mqClient.subscribe("test"); 
      Log.i(TAG, "Subscribed test"); 
      } 
     catch (MqttException me){ 
      Log.e(TAG, "MqttClient Exception Occured in following!!!", me); 
     } 
    } 

    @Override 
    public void connectionLost(Throwable cause) { 
     Log.i(TAG, "ConnectionLost"); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken token) { 
     Log.i(TAG, "Delivered"); 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage message) throws Exception { 
     Log.i(TAG, "Received update: " + topic + ":" + message.toString()); 
    } 
} 

ответ

1

Существует Eclipse Paho Android Service, которая посвящена Android вы можете использовать вместо обычного MqttClient, это может решить вашу проблему (если ваш уверены, что проблема не на вашей стороне MQTT сервера) & некоторые другие проблемы, которые могут иметь в будущем, если вы хотите, чтобы обосноваться услугу Android MQTT:

Если вы хотите, чтобы дать ему попробовать:

в build.gradle:

compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.0.3-SNAPSHOT' 
compile ('org.eclipse.paho:org.eclipse.paho.android.service:1.0.3-SNAPSHOT'){ 
    exclude module: 'support-v4' 
} 
compile 'com.android.support:support-v4:22.1.0' 

В AndroidManifest.xml:

<uses-permission android:name="android.permission.INTERNET" /> 

и в вашем <application></application>:

<service android:name="org.eclipse.paho.android.service.MqttService" /> 

Вот пример MqttHandler.java:

public class MqttHandler { 

    protected final static String TAG = DeviceHandler.class.getSimpleName(); 

    /** 
    * MQTT client 
    */ 
    private MqttAndroidClient mClient = null; 

    /** 
    * client ID used to authenticate 
    */ 
    protected String mClientId = ""; 

    /** 
    * Android context 
    */ 
    private Context mContext = null; 

    /** 
    * callback for MQTT events 
    */ 
    private MqttCallback mClientCb = null; 

    /** 
    * callback for MQTT connection 
    */ 
    private IMqttActionListener mConnectionCb = null; 

    /** 
    * Sets whether the client and server should remember state across restarts and reconnects 
    */ 
    protected boolean mCleanSessionDefault = false; 

    /** 
    * Sets the connection timeout value (in seconds) 
    */ 
    protected int mTimeoutDefault = 30; 

    /** 
    * Sets the "keep alive" interval (in seconds) 
    */ 
    protected int mKeepAliveDefault = 60; 

    /** 
    * connection state 
    */ 
    private boolean connected = false; 

    /** 
    * list of message callbacks 
    */ 
    private List<IMessageCallback> mMessageCallbacksList = new ArrayList<>(); 

    private final static String SERVER_URI = "192.168.1.46"; 

    private final static int SERVER_PORT = 1883; 

    public MqttHandler(Context context) { 

     this.mContext = context; 

     this.mClientCb = new MqttCallback() { 
      @Override 
      public void connectionLost(Throwable cause) { 
       connected = false; 
       for (int i = 0; i < mMessageCallbacksList.size(); i++) { 
        mMessageCallbacksList.get(i).connectionLost(cause); 
       } 
      } 

      @Override 
      public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 
       for (int i = 0; i < mMessageCallbacksList.size(); i++) { 
        mMessageCallbacksList.get(i).messageArrived(topic, mqttMessage); 
       } 
      } 

      @Override 
      public void deliveryComplete(IMqttDeliveryToken token) { 
       for (int i = 0; i < mMessageCallbacksList.size(); i++) { 
        mMessageCallbacksList.get(i).deliveryComplete(token); 
       } 
      } 
     }; 
    } 

    public boolean isConnected() { 
     if (mClient == null) 
      return false; 
     else 
      return connected; 
    } 

    public void connect() { 

     try { 
      if (!isConnected()) { 

       MqttConnectOptions options = new MqttConnectOptions(); 

       String serverURI = ""; 

       options.setCleanSession(mCleanSessionDefault); 
       options.setConnectionTimeout(mTimeoutDefault); 
       options.setKeepAliveInterval(mKeepAliveDefault); 

       mClient = new MqttAndroidClient(mContext, "tcp://" + SERVER_URI + ":" + SERVER_PORT, mClientId); 
       mClient.setCallback(mClientCb); 

       mConnectionCb = new IMqttActionListener() { 
        @Override 
        public void onSuccess(IMqttToken iMqttToken) { 
         connected = true; 
         for (int i = 0; i < mMessageCallbacksList.size(); i++) { 
          mMessageCallbacksList.get(i).onConnectionSuccess(iMqttToken); 
         } 
        } 

        @Override 
        public void onFailure(IMqttToken iMqttToken, Throwable throwable) { 
         connected = false; 
         for (int i = 0; i < mMessageCallbacksList.size(); i++) { 
          mMessageCallbacksList.get(i).onConnectionFailure(iMqttToken, throwable); 
         } 
        } 
       }; 

       try { 
        mClient.connect(options, mContext, mConnectionCb); 
       } catch (MqttException e) { 
        e.printStackTrace(); 
       } 
      } else { 
       Log.v(TAG, "cant connect - already connected"); 
      } 
     } catch (IllegalArgumentException e) { 
      Log.v(TAG, "parameters error. cant connect"); 
     } 
    } 

    public void disconnect() { 

     if (isConnected()) { 
      try { 
       mClient.disconnect(mContext, mConnectionCb); 

      } catch (MqttException e) { 
       e.printStackTrace(); 
      } 
     } else { 
      Log.v(TAG, "cant disconnect - already disconnected"); 
     } 
    } 

    /** 
    * Publish a message to MQTT server 
    * 
    * @param topic  message topic 
    * @param message message body 
    * @param isRetained define if message should be retained on MQTT server 
    * @param listener completion listener (null allowed) 
    * @return 
    */ 
    public IMqttDeliveryToken publishMessage(String topic, String message, boolean isRetained, IMqttActionListener listener) { 

     if (isConnected()) { 

      MqttMessage mqttMessage = new MqttMessage(message.getBytes()); 
      mqttMessage.setRetained(isRetained); 
      mqttMessage.setQos(0); 

      try { 
       return mClient.publish(topic, mqttMessage, mContext, listener); 
      } catch (MqttPersistenceException e) { 
       e.printStackTrace(); 
      } catch (MqttException e) { 
       e.printStackTrace(); 
      } 
     } else { 
      Log.e(TAG, "cant publish message. Not connected"); 
     } 
     return null; 
    } 

    /** 
    * Subscribe to topic 
    * 
    * @param topic topic to subscribe 
    * @param listener completion listener (null allowed) 
    * @return 
    */ 
    public void subscribe(String topic, IMqttActionListener listener) { 

     if (isConnected()) { 
      try { 
       mClient.subscribe(topic, 0, mContext, listener); 
      } catch (MqttException e) { 
       e.printStackTrace(); 
      } 
     } else { 
      Log.e(TAG, "cant publish message. Not connected"); 
     } 
    } 

    /** 
    * Unsubscribe a topic 
    * 
    * @param topic topic to unsubscribe 
    * @param listener completion listener (null allowed) 
    */ 
    public void unsubscribe(String topic, IMqttActionListener listener) { 

     if (isConnected()) { 
      try { 
       mClient.unsubscribe(topic, mContext, listener); 
      } catch (MqttException e) { 
       e.printStackTrace(); 
      } 
     } else { 
      Log.e(TAG, "cant publish message. Not connected"); 
     } 
    } 

    public void addCallback(IMessageCallback callback) { 
     mMessageCallbacksList.add(callback); 
    } 
} 

С этим слушателем IMessageCallback.java:

public interface IMessageCallback { 

    /** 
    * This method is called when the connection to the server is lost. 
    * 
    * @param cause the reason behind the loss of connection. 
    */ 
    void connectionLost(Throwable cause); 

    /** 
    * This method is called when a message arrives from the server. 
    * 
    * @param topic  name of the topic on the message was published to 
    * @param mqttMessage the actual message 
    * @throws Exception 
    */ 
    void messageArrived(String topic, MqttMessage mqttMessage) throws Exception; 

    /** 
    * Called when delivery for a message has been completed, and all acknowledgments have been received. 
    * 
    * @param messageToken he delivery token associated with the message. 
    */ 
    void deliveryComplete(IMqttDeliveryToken messageToken); 

    /** 
    * Called when connection is established 
    * 
    * @param iMqttToken token for this connection 
    */ 
    void onConnectionSuccess(IMqttToken iMqttToken); 

    /** 
    * Called when connection has failed 
    * 
    * @param iMqttToken token when failure occured 
    * @param throwable exception 
    */ 
    void onConnectionFailure(IMqttToken iMqttToken, Throwable throwable); 

    /** 
    * Called when disconnection is successfull 
    * 
    * @param iMqttToken token for this connection 
    */ 
    void onDisconnectionSuccess(IMqttToken iMqttToken); 

    /** 
    * Called when disconnection failed 
    * 
    * @param iMqttToken token when failure occured 
    * @param throwable exception 
    */ 
    void onDisconnectionFailure(IMqttToken iMqttToken, Throwable throwable); 
} 

Вы можете назвать это так:

final MqttHandler mqttHandler = new MqttHandler(mContext); 

mqttHandler.addCallback(new IMessageCallback() { 
    @Override 
    public void connectionLost(Throwable cause) { 
     Log.v(TAG, "connectionLost"); 
    } 

    @Override 
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception { 
     Log.v(TAG, "messageArrived : " + topic + " : " + new String(mqttMessage.getPayload())); 
    } 

    @Override 
    public void deliveryComplete(IMqttDeliveryToken messageToken) { 
     try { 
      Log.v(TAG, "deliveryComplete : " + new String(messageToken.getMessage().getPayload())); 
     } catch (MqttException e) { 
      e.printStackTrace(); 
     } 
    } 

    @Override 
    public void onConnectionSuccess(IMqttToken iMqttToken) { 

     Log.v(TAG, "connection success"); 

     mqttHandler.subscribe("test", new IMqttActionListener() { 
      @Override 
      public void onSuccess(IMqttToken asyncActionToken) { 
       Log.v(TAG, "subscribe success"); 
      } 

      @Override 
      public void onFailure(IMqttToken asyncActionToken, Throwable exception) { 
       Log.e(TAG, "subscribe failure"); 
      } 
     }); 
    } 

    @Override 
    public void onConnectionFailure(IMqttToken iMqttToken, Throwable throwable) { 
     Log.v(TAG, "connection failure"); 
    } 

    @Override 
    public void onDisconnectionSuccess(IMqttToken iMqttToken) { 
     Log.v(TAG, "disconnection success"); 
    } 

    @Override 
    public void onDisconnectionFailure(IMqttToken iMqttToken, Throwable throwable) { 
     Log.v(TAG, "disconnection failure"); 
    } 
}); 

mqttHandler.connect(); 

Вы можете найти полный рабочий USECASE с клиентом Pāho MQTT Android here

+0

Эй! Спасибо за Ваш ответ. Хотя это не совсем то, что я искал, это помогло мне решить проблему. Я не установил обратный вызов на моем клиенте. Эта строка решает его 'mqClient.setCallback (this);' – spidergears