前言

之前写了一篇为什么智能硬件首选MQTT – ,这次就来搭建一个自己的MQTT交互渠道,实际体验一下,没有实战怎么能行。

一、服务端预备

1. 挑选渠道

我这儿用的渠道是EMQX Cloud,能够通过github账号免费请求一个MQTT服务器,对于个人运用来说特别方便,一起运用运用 MQTT 客户端快速测试 MQTT 服务去监听或许模拟下发,这儿咱们挑选免费敞开,点击当即布置然后一向同意就树立好了。

MQTT这么好玩不来自己搭建一个吗

2. 发动服务

树立好今后咱们点击项目管理,里边就会呈现一个咱们刚请求的服务器,进去后点击发动,这样咱们就把服务发动起来了。

MQTT这么好玩不来自己搭建一个吗
MQTT这么好玩不来自己搭建一个吗

3. 创立用户

点击认证鉴权后挑选认证,然后点击右边的添加,即可创立咱们的衔接用户,这个用户的称号和暗码就是咱们客户端一会树立衔接的时候需求的username和password。至此咱们就能够去客户端去写衔接代码了。

MQTT这么好玩不来自己搭建一个吗

二、客户端搭建

1. 引进

dependencies {
    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.4'
    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1' 
}

2. AndroidManifest.xml 配置

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
<application
   ...
   <service android:name="org.eclipse.paho.android.service.MqttService" />
</application>

3. 创立MQTT客户端

private static MqttAndroidClient mqttAndroidClient;
private static String mqttUsername = ""; //服务端创立的用户名
private static String mqttPassword = ""; //服务端吧创立的用户名暗码
private static String clientId = ""; //唯一标识不行重复
 //承受音讯的行列
public static final LinkedBlockingQueue<MyMessage> SERVER_QUEUE = new LinkedBlockingQueue<>(
            200);
//音讯订阅的topic,能够自定义
private static final String topic = "/" + mqttUsername + "/" + clientId + "/user/get"; 
public static void initIot() {
        String serverUrl = "服务器地址:端口";
        try {
            mqttAndroidClient = new MqttAndroidClient(context, serverUrl, "clientId");
            mqttAndroidClient.setCallback(new MqttCallback() {
                @Override
                public void connectionLost(Throwable cause) {
                    Log.i(TAG, "衔接断开");
                }
                @Override
                public void messageArrived(String topic, MqttMessage message) throws Exception {
                    Log.i(TAG, "收到音讯:" + message.toString());
                    //主张运用行列接收
                    MyMessage myMessage = new MyMessage();
                    myMessage.setData(message.getPayload());
                    boolean offer = SERVER_QUEUE.offer(aMessage);
                    if (!offer) {
                        Log.e(TAG, "行列已满,无法承受音讯!");
                    }
                }
                @Override
                public void deliveryComplete(IMqttDeliveryToken token) {
                    Log.i(TAG, "deliveryComplete: " + token.toString());
                }
            });
            //树立衔接规则
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(mqttUsername);
            options.setPassword(mqttPassword.toCharArray());
            options.setCleanSession(true);
            options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //MQTT版别
            options.setConnectionTimeout(10); //衔接超时时间
            options.setKeepAliveInterval(180); //心跳间隔时间
            options.setMaxInflight(100); //最大请求数,默许10,高流量场景能够增大该值
            options.setAutomaticReconnect(true); //设置主动重新衔接
            mqttAndroidClient.connect(options, null, new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "衔接成功");
                    //这儿订阅音讯
                    subscribe();
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "衔接失利" + exception);
                }
            });
        } catch (Exception e) {
            Log.e(TAG, "INIT IOT ERROR!");
        }
    }
public class MyMessage {
    public Object data;
    public MyMessage() {
    }
    public MyMessage(Object data) {
        this.data = data;
    }
    public Object getData() {
        return this.data;
    }
    public void setData(Object data) {
        this.data = data;
    }
}

4. 订阅音讯

private static void subscribe() {
        try {
            mqttAndroidClient.subscribe(topic, 1, null,
                    new IMqttActionListener() {
                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {
                            Log.i(TAG,
                                    "订阅成功 topic: "
                                            + topic);
                        }
                        @Override
                        public void onFailure(IMqttToken asyncActionToken,
                                              Throwable exception) {
                            Log.e(TAG, "订阅失利!" + exception.getMessage());
                        }
                    });
        } catch (Exception e) {
            Log.e(TAG, "订阅失利!" + e.getMessage());
        }
    }

5. 发布音讯

//音讯发送行列
public static final LinkedBlockingQueue<String> CLIENT_QUEUE = new LinkedBlockingQueue<>(1000);
//发布音讯调用这个办法
public static void putQueue(String msg) {
        boolean offer = CLIENT_QUEUE.offer(msg);
        if (!offer) {
            Log.w(TAG, "操作行列已满!");
        }
    }
//运用线程去读取行列,这样能够避免同一时间多处调用,一起也不会让发送事件丢掉
static class IotPublishRunnable implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    String msg = CLIENT_QUEUE.take();
                    if (TextUtils.isEmpty(msg)) {
                        continue;
                    }
                    publish(msg);
                    Thread.sleep(300);
                } catch (Exception e) {
                    Log.e(TAG, "处理iot音讯失利");
                }
            }
        }
    }
private static void publishNew(String payload) {
        String topic = "/" + mqttUsername + "/" + clientId + "/user/update";
        Integer qos = 1;
        try {
            if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
                Log.w(TAG, "IOT还未初始化!无法发送音讯");
                return;
            }
            mqttAndroidClient.publish(topic, payload.getBytes(StandardCharsets.UTF_8), qos, false,
                    null, new IMqttActionListener() {
                        @Override
                        public void onSuccess(IMqttToken asyncActionToken) {
                        }
                        @Override
                        public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                            String[] topics = asyncActionToken.getTopics();
                            Log.e(TAG, "publish message error! topics: " + Arrays.toString(topics));
                        }
                    });
        } catch (MqttException e) {
            Log.e(TAG, "发送音讯失利!");
        } catch (IllegalArgumentException e) {
            Log.e(TAG, "MQTT CLIENT ERROR");
        }
    }

6. 断开衔接

public static void disconnect() {
        if (null == mqttAndroidClient || !mqttAndroidClient.isConnected()) {
            Log.w(TAG, "IOT还未初始化!");
            return;
        }
        try {
            mqttAndroidClient.disconnect().setActionCallback(new IMqttActionListener() {
                @Override
                public void onSuccess(IMqttToken asyncActionToken) {
                    Log.i(TAG, "断开衔接成功!");
                }
                @Override
                public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                    Log.i(TAG, "断开衔接失利!");
                }
            });
        } catch (MqttException e) {
            Log.e(TAG, e.getMessage());
        }
    }

结束

以上就是客户端的MQTT代码,我是用Java写的,Kotlin版的主张参阅Android 运用 Kotlin 衔接 MQTT,代码根本就在这儿了,项目啥的就不放了。