在上篇『ActiveMQ MQTT Server 架設』中我們已架設好MQTT Server,此篇我們將在Android上建立MQTT Client與MQTT Server做溝通.
此篇使用org.eclipse.paho來架設MQTT Client
Step 1: 引用org.eclipse.paho
在build.gradle中引用org.eclipse.paho
build.gradle1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| android { ... } repositories { maven { url "https://repo.eclipse.org/content/repositories/paho-snapshots/" } }
dependencies { ... implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0' implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' }
|
Step 2: 建立MqttService
在AndroidManifest.xml中宣告權限及建立MqttService
AndroidManifest.xml1 2 3 4 5 6 7 8
| <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/> <uses-permission android:name="android.permission.INTERNET"/> <uses-permission android:name="android.permission.WAKE_LOCK"/> <application> <service android:name="org.eclipse.paho.android.service.MqttService"/> </application>
|
Step 3: Implement MqttCallbackExtended
MainActivity.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| private class MqttCallback : MqttCallbackExtended { override fun connectComplete(reconnect: Boolean, serverURI: String?) { Log.i(TAG, "connect Complete" + Thread.currentThread().id) }
override fun messageArrived(topic: String?, message: MqttMessage?) { Log.i(TAG, "topic: " + topic + " message: "+ String(message!!.payload, Charset.forName("utf-8")))
}
override fun connectionLost(cause: Throwable?) { Log.i(TAG, "connection Lost ") }
override fun deliveryComplete(token: IMqttDeliveryToken?) { Log.i(TAG, "delivery Complete ") } }
|
在這四個函式中,不可進行UI的操作。可使用Handler後再做UI的處理。
Step 4: 初始化MQTT Client
MainActivity.kt1 2 3 4 5 6 7 8 9 10
| private var mMqttClient: MqttAsyncClient? = null private val host = "tcp://192.168.0.98:1883" private val mClientID = "xx:xx:xx:xx:xx:xx" ...
if (null == mMqttClient) { mMqttClient = MQTTUtils.initClient(host, mClientID, MqttCallback()) }
|
我們使用一個公用object MQTTUtils 來處理初始化。
MQTTUtils.kt1 2 3 4 5 6 7 8 9 10
| object MQTTUtils { fun initClient(serverURI: String, clientId: String, callback: MqttCallback) :MqttAsyncClient? { var client: MqttAsyncClient? = null var persistence = MemoryPersistence() client = MqttAsyncClient (serverURI, clientId, persistence) client.setCallback(callback) return client } }
|
serverURI: MQTT Server網址
clientId: 唯一識別碼,告知Broker知道是哪個Client
callback: 回調函式
在上篇『ActiveMQ MQTT Server 架設』中,啟動Server時會得到Server的資訊,
其中會有MQTT的資訊。

在此port為1883
在程式中MQTT Server的網址設定為
tcp://xxx.xxx.xxx.xxx:1883
Step 5: 設定連接參數
在MQTTUtils中設置連接參數
MQTTUtils.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| object MQTTUtils { fun initMqttConnectionOptions(): MqttConnectOptions { var mOptions = MqttConnectOptions() mOptions.isAutomaticReconnect = false mOptions.isCleanSession = true mOptions.connectionTimeout = 10 mOptions.userName = "admin" mOptions.password = "admin".toCharArray() mOptions.keepAliveInterval = 10 mOptions.maxInflight = 10 mOptions.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1 return mOptions } }
|
參數說明
isAutomaticReconnect: 連接中斷時,是否會自動連接
isCleanSession: 若斷開連線時是否清除連接資訊
connectionTimeout: 連線逾時時間,以秒為單位
userName:帳號
password: 密碼
keepAliveInterval: Heartbeat,每隔幾秒會再度確認是否連線
maxInflight: 允許同時發送幾條訊息
mqttVersion: MQTT 版本
Step 6: 與MQTT Server連線
MainActivity.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| private val mTopicID = "topic1"
var options = MQTTUtils.initMqttConnectionOptions()
mMqttClient!!.connect(options, null, object : IMqttActionListener { override fun onSuccess(asyncActionToken: IMqttToken) { mMqttClient!!.subscribe(mTopicID, 1) }
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { } })
|
連線成功後,傳送溝通的Topic: mTopicID
執行結果
連接上後,透過ActiveMQ的管理界面,可看到Topics有剛剛所設定的mTopicID: topic1。

透過此Topic 可與MQTT Client做溝通。

Source Code
MainActivity.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| class MainActivity : AppCompatActivity(), View.OnClickListener { private val host = "tcp://xxx.xxx.xxx.xxx:1883" private val mClientID = "xx:xx:xx:xx:xx:xx" private val mTopicID = "topic1" private var mMqttClient: MqttAsyncClient? = null
override fun onCreate(savedInstanceState: Bundle?) { super.onCreate(savedInstanceState) setContentView(R.layout.activity_main)
var btConnect = findViewById<Button>(R.id.bt_connect) btConnect.setOnClickListener(this) }
override fun onDestroy() { super.onDestroy() if (null != mMqttClient) { mMqttClient!!.disconnect() } }
private fun connect() { Thread(Runnable { try { if (null == mMqttClient) { mMqttClient = MQTTUtils.initClient(host, mClientID, MqttCallback()) } var options = MQTTUtils.initMqttConnectionOptions()
mMqttClient!!.connect(options, null, object : IMqttActionListener { override fun onSuccess(asyncActionToken: IMqttToken) { mMqttClient!!.subscribe(mTopicID, 1) }
override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) { } })
} catch (e: Exception) { e.printStackTrace() } }).start() }
private class MqttCallback : MqttCallbackExtended{ val TAG = "MqttCallback" override fun connectComplete(reconnect: Boolean, serverURI: String?) { Log.i(TAG, "connect Complete" + Thread.currentThread().id) }
override fun messageArrived(topic: String?, message: MqttMessage?) { Log.i(TAG, "topic: " + topic + " message: "+ String(message!!.payload, Charset.forName("utf-8")))
}
override fun connectionLost(cause: Throwable?) { Log.i(TAG, "connection Lost ") }
override fun deliveryComplete(token: IMqttDeliveryToken?) { Log.i(TAG, "delivery Complete ") } }
override fun onClick(v: View?) { when(v!!.id) { R.id.bt_connect -> connect() } }
}
|
MQTTUtils.kt1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| object MQTTUtils { fun initMqttConnectionOptions(): MqttConnectOptions { var mOptions = MqttConnectOptions() mOptions.isAutomaticReconnect = false mOptions.isCleanSession = true mOptions.connectionTimeout = 10 mOptions.userName = "admin" mOptions.password = "admin".toCharArray() mOptions.keepAliveInterval = 10 mOptions.maxInflight = 10 mOptions.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1 return mOptions }
fun initClient(serverURI: String, clientId: String, callback: MqttCallback) :MqttAsyncClient? { var client: MqttAsyncClient? = null var persistence = MemoryPersistence() client = MqttAsyncClient (serverURI, clientId, persistence) client.setCallback(callback) return client } }
|