前言
之前写了一篇为什么智能硬件首选MQTT – ,这次就来搭建一个自己的MQTT交互渠道,实际体验一下,没有实战怎么能行。
一、服务端预备
1. 挑选渠道
我这儿用的渠道是EMQX Cloud,能够通过github账号免费请求一个MQTT服务器,对于个人运用来说特别方便,一起运用运用 MQTT 客户端快速测试 MQTT 服务去监听或许模拟下发,这儿咱们挑选免费敞开,点击当即布置然后一向同意就树立好了。
2. 发动服务
树立好今后咱们点击项目管理,里边就会呈现一个咱们刚请求的服务器,进去后点击发动,这样咱们就把服务发动起来了。
3. 创立用户
点击认证鉴权后挑选认证,然后点击右边的添加,即可创立咱们的衔接用户,这个用户的称号和暗码就是咱们客户端一会树立衔接的时候需求的username和password。至此咱们就能够去客户端去写衔接代码了。
二、客户端搭建
1. 引进
dependencies {
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
2. AndroidManifest.xml 配置
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
...
<service android:name="org.eclipse.paho.android.service.MqttService" />
</application>
3. 创立MQTT客户端
private static MqttAndroidClient mqttAndroidClient;
private static String mqttUsername = ""; //服务端创立的用户名
private static String mqttPassword = ""; //服务端吧创立的用户名暗码
private static String clientId = ""; //唯一标识不行重复
//承受音讯的行列
public static final LinkedBlockingQueue<MyMessage> SERVER_QUEUE = new LinkedBlockingQueue<>(
200);
//音讯订阅的topic,能够自定义
private static final String topic = "/" + mqttUsername + "/" + clientId + "/user/get";
public static void initIot() {
String serverUrl = "服务器地址:端口";
try {
mqttAndroidClient = new MqttAndroidClient(context, serverUrl, "clientId");
mqttAndroidClient.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
Log.i(TAG, "衔接断开");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Log.i(TAG, "收到音讯:" + message.toString());
//主张运用行列接收
MyMessage myMessage = new MyMessage();
myMessage.setData(message.getPayload());
boolean offer = SERVER_QUEUE.offer(aMessage);
if (!offer) {
Log.e(TAG, "行列已满,无法承受音讯!");
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
Log.i(TAG, "deliveryComplete: " + token.toString());
}
});
//树立衔接规则
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(mqttUsername);
options.setPassword(mqttPassword.toCharArray());
options.setCleanSession(true);
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //MQTT版别
options.setConnectionTimeout(10); //衔接超时时间
options.setKeepAliveInterval(180); //心跳间隔时间
options.setMaxInflight(100); //最大请求数,默许10,高流量场景能够增大该值
options.setAutomaticReconnect(true); //设置主动重新衔接
mqttAndroidClient.connect(options, null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG, "衔接成功");
//这儿订阅音讯
subscribe();
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG, "衔接失利" + exception);
}
});
} catch (Exception e) {
Log.e(TAG, "INIT IOT ERROR!");
}
}
public class MyMessage {
public Object data;
public MyMessage() {
}
public MyMessage(Object data) {
this.data = data;
}
public Object getData() {
return this.data;
}
public void setData(Object data) {
this.data = data;
}
}
4. 订阅音讯
private static void subscribe() {
try {
mqttAndroidClient.subscribe(topic, 1, null,
new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG,
"订阅成功 topic: "
+ topic);
}
@Override
public void onFailure(IMqttToken asyncActionToken,
Throwable exception) {
Log.e(TAG, "订阅失利!" + exception.getMessage());
}
});
} catch (Exception e) {
Log.e(TAG, "订阅失利!" + e.getMessage());
}
}
5. 发布音讯
//音讯发送行列
public static final LinkedBlockingQueue<String> CLIENT_QUEUE = new LinkedBlockingQueue<>(1000);
//发布音讯调用这个办法
public static void putQueue(String msg) {
boolean offer = CLIENT_QUEUE.offer(msg);
if (!offer) {
Log.w(TAG, "操作行列已满!");
}
}
//运用线程去读取行列,这样能够避免同一时间多处调用,一起也不会让发送事件丢掉
static class IotPublishRunnable implements Runnable {
@Override
public void run() {
while (true) {
try {
String msg = CLIENT_QUEUE.take();
if (TextUtils.isEmpty(msg)) {
continue;
}
publish(msg);
Thread.sleep(300);
} catch (Exception e) {
Log.e(TAG, "处理iot音讯失利");
}
}
}
}
private static void publishNew(String payload) {
String topic = "/" + mqttUsername + "/" + clientId + "/user/update";
Integer qos = 1;
try {
if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
Log.w(TAG, "IOT还未初始化!无法发送音讯");
return;
}
mqttAndroidClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), qos, false,
null, new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
String[] topics = asyncActionToken.getTopics();
Log.e(TAG, "publish message error! topics: " + Arrays.toString(topics));
}
});
} catch (MqttException e) {
Log.e(TAG, "发送音讯失利!");
} catch (IllegalArgumentException e) {
Log.e(TAG, "MQTT CLIENT ERROR");
}
}
6. 断开衔接
public static void disconnect() {
if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
Log.w(TAG, "IOT还未初始化!");
return;
}
try {
mqttAndroidClient.disconnect().setActionCallback(new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
Log.i(TAG, "断开衔接成功!");
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
Log.i(TAG, "断开衔接失利!");
}
});
} catch (MqttException e) {
Log.e(TAG, e.getMessage());
}
}
结束
以上就是客户端的MQTT代码,我是用Java写的,Kotlin版的主张参阅Android 运用 Kotlin 衔接 MQTT,代码根本就在这儿了,项目啥的就不放了。