Android MQTT Client

在上篇『ActiveMQ MQTT Server 架設』中我們已架設好MQTT Server,此篇我們將在Android上建立MQTT Client與MQTT Server做溝通.

此篇使用org.eclipse.paho來架設MQTT Client

Step 1: 引用org.eclipse.paho

在build.gradle中引用org.eclipse.paho

build.gradle
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 android {
...
}

repositories {
maven {
url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
}
}

dependencies {
...
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}

Step 2: 建立MqttService

在AndroidManifest.xml中宣告權限及建立MqttService

AndroidManifest.xml
1
2
3
4
5
6
7
8
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE"/>
<uses-permission android:name="android.permission.INTERNET"/>
<uses-permission android:name="android.permission.WAKE_LOCK"/>

<application>
<service android:name="org.eclipse.paho.android.service.MqttService"/>
</application>

Step 3: Implement MqttCallbackExtended

MainActivity.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private class MqttCallback : MqttCallbackExtended {
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
Log.i(TAG, "connect Complete" + Thread.currentThread().id)
}

override fun messageArrived(topic: String?, message: MqttMessage?) {
Log.i(TAG, "topic: " + topic + " message: "+ String(message!!.payload, Charset.forName("utf-8")))

}

override fun connectionLost(cause: Throwable?) {
Log.i(TAG, "connection Lost ")
}

override fun deliveryComplete(token: IMqttDeliveryToken?) {
Log.i(TAG, "delivery Complete ")
}
}

在這四個函式中,不可進行UI的操作。可使用Handler後再做UI的處理。

Step 4: 初始化MQTT Client

MainActivity.kt
1
2
3
4
5
6
7
8
9
10

private var mMqttClient: MqttAsyncClient? = null
private val host = "tcp://192.168.0.98:1883"
private val mClientID = "xx:xx:xx:xx:xx:xx"
...

if (null == mMqttClient) {
mMqttClient = MQTTUtils.initClient(host, mClientID, MqttCallback())
}

我們使用一個公用object MQTTUtils 來處理初始化。

MQTTUtils.kt
1
2
3
4
5
6
7
8
9
10
object MQTTUtils {
fun initClient(serverURI: String, clientId: String, callback: MqttCallback) :MqttAsyncClient? {
var client: MqttAsyncClient? = null
var persistence = MemoryPersistence()
client = MqttAsyncClient (serverURI, clientId, persistence)
client.setCallback(callback)
return client
}
}

serverURI: MQTT Server網址
clientId: 唯一識別碼,告知Broker知道是哪個Client
callback: 回調函式

在上篇『ActiveMQ MQTT Server 架設』中,啟動Server時會得到Server的資訊,
其中會有MQTT的資訊。


在此port為1883

在程式中MQTT Server的網址設定為
tcp://xxx.xxx.xxx.xxx:1883

Step 5: 設定連接參數

在MQTTUtils中設置連接參數

MQTTUtils.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
object MQTTUtils {
fun initMqttConnectionOptions(): MqttConnectOptions {
var mOptions = MqttConnectOptions()
mOptions.isAutomaticReconnect = false
mOptions.isCleanSession = true
mOptions.connectionTimeout = 10
mOptions.userName = "admin"
mOptions.password = "admin".toCharArray()
mOptions.keepAliveInterval = 10
mOptions.maxInflight = 10
mOptions.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1
return mOptions
}
}

參數說明
isAutomaticReconnect: 連接中斷時,是否會自動連接
isCleanSession: 若斷開連線時是否清除連接資訊
connectionTimeout: 連線逾時時間,以秒為單位
userName:帳號
password: 密碼
keepAliveInterval: Heartbeat,每隔幾秒會再度確認是否連線
maxInflight: 允許同時發送幾條訊息
mqttVersion: MQTT 版本

Step 6: 與MQTT Server連線

MainActivity.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

private val mTopicID = "topic1"


var options = MQTTUtils.initMqttConnectionOptions()

mMqttClient!!.connect(options, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
mMqttClient!!.subscribe(mTopicID, 1)
}

override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {

}
})

連線成功後,傳送溝通的Topic: mTopicID

執行結果

連接上後,透過ActiveMQ的管理界面,可看到Topics有剛剛所設定的mTopicID: topic1。

透過此Topic 可與MQTT Client做溝通。

Source Code

MainActivity.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
class MainActivity : AppCompatActivity(), View.OnClickListener {
private val host = "tcp://xxx.xxx.xxx.xxx:1883"
private val mClientID = "xx:xx:xx:xx:xx:xx"
private val mTopicID = "topic1"
private var mMqttClient: MqttAsyncClient? = null

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

var btConnect = findViewById<Button>(R.id.bt_connect)
btConnect.setOnClickListener(this)
}

override fun onDestroy() {
super.onDestroy()
if (null != mMqttClient) {
mMqttClient!!.disconnect()
}
}

private fun connect() {
Thread(Runnable {
try {
if (null == mMqttClient) {
mMqttClient = MQTTUtils.initClient(host, mClientID, MqttCallback())
}
var options = MQTTUtils.initMqttConnectionOptions()

mMqttClient!!.connect(options, null, object : IMqttActionListener {
override fun onSuccess(asyncActionToken: IMqttToken) {
mMqttClient!!.subscribe(mTopicID, 1)
}

override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
}
})


} catch (e: Exception) {
e.printStackTrace()
}
}).start()
}

private class MqttCallback : MqttCallbackExtended{
val TAG = "MqttCallback"
override fun connectComplete(reconnect: Boolean, serverURI: String?) {
Log.i(TAG, "connect Complete" + Thread.currentThread().id)
}

override fun messageArrived(topic: String?, message: MqttMessage?) {
Log.i(TAG, "topic: " + topic + " message: "+ String(message!!.payload, Charset.forName("utf-8")))

}

override fun connectionLost(cause: Throwable?) {
Log.i(TAG, "connection Lost ")
}

override fun deliveryComplete(token: IMqttDeliveryToken?) {
Log.i(TAG, "delivery Complete ")
}
}

override fun onClick(v: View?) {
when(v!!.id) {
R.id.bt_connect -> connect()
}
}

}

MQTTUtils.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
object MQTTUtils {
fun initMqttConnectionOptions(): MqttConnectOptions {
var mOptions = MqttConnectOptions()
mOptions.isAutomaticReconnect = false
mOptions.isCleanSession = true
mOptions.connectionTimeout = 10
mOptions.userName = "admin"
mOptions.password = "admin".toCharArray()
mOptions.keepAliveInterval = 10
mOptions.maxInflight = 10
mOptions.mqttVersion = MqttConnectOptions.MQTT_VERSION_3_1_1
return mOptions
}

fun initClient(serverURI: String, clientId: String, callback: MqttCallback) :MqttAsyncClient? {
var client: MqttAsyncClient? = null
var persistence = MemoryPersistence()
client = MqttAsyncClient (serverURI, clientId, persistence)
client.setCallback(callback)
return client
}
}

作者

Nick Lin

發表於

2020-02-14

更新於

2023-01-18

許可協議


評論