最近在忙事务的间隙,穿插着做了些功能测验。

一、背景简介

事务背景大约介绍一下,便是依照国标规定,车辆需求上传一些指定的数据到ZF的指定渠道,一起车辆也会把数据传到企业云端服务上,所以乎就发生了一些功能需求。

现在咱们仅仅先简略的进行了一个功能场景的测验,便是评价现在服务是否能够支撑,预期的最大一起在线车辆上传数据。通过评价,在线车辆数据依照预期的10倍来进行的,而且后边添加持续运转12h检查服务链路的安稳性。

本篇并不是一个严谨的功能测验过程成果共享,主要是共享下关于mqtt协议服务的压测脚本的编写。因为之前我也没触摸过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记载一下,仅供参考。

捋一下链路就知道需求生成哪些数据(因为服务还未上线运用,所以发生的压测数据后边能够直接整理掉即可。):

  1. 一些前置数据:比方数据库、缓存里涉及到的车辆数据,通讯秘钥数据等等,这些能够之前写脚本一次性生成即可。
  2. 车辆上报的数据:车辆上报到云端的数据,是通过一系列加密转码,期间还要设计到解密等,这个通过评价,能够简化其间的某些环境,所以所有的车能够直接发送相同的数据即可。
  3. 车辆数据:最终便是生成对应的车辆数据,一起在线,依照评价的频率发送数据。

其间第1、2的数据在之前针对性的别离生成即可,第3步的车辆发送数据便是压测脚本要干的工作了。

二、技术选型

这个却是很快,搜索引擎大约搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要费事的多。

所以就持续编码好了,依然首选python,想到了locust库,后来看官方文档的时分,看到locust也针对mqtt协议拓宽了一些内容。但是我测验下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开端编写了。

搜索中又发现Python中用于mqtt协议的库叫paho.mqtt,支撑衔接署理,音讯的订阅、收发等等,所以最终确认运用:locust+paho.mqtt的组合来完成本次的负载脚本。

三、代码编写

1. 脚本代码

暂时没做代码分层,现在场景简略,就直接都放一个模块里了,有点长,先贴上来,后边部分会对脚本的要点内容进行拆解。

脚本现在做了这些工作:

  • 从db中查询有用可用的所有测验车辆信息数据
  • 依据命令行的输入参数,指定发动的车辆数,以及与broker署理树立衔接的频率
  • 树立衔接成功的车辆,就能够依据脚本里指定的频次,来像broker发送数据
  • 脚本核算衔接数、恳求数、呼应时刻等信息写到报表中
  • 调试遇到车辆会批量断开衔接的状况,添加了当车辆断开衔接时,把断开时刻、车辆信息写到本地csv中,便利第二天来检查分析。
import csv
import datetime
import queue
import os
import sys
import time
import ssl
from paho.mqtt import client as mqtt_client
# 依据不同系统进行路径适配
if os.name == "nt":
    path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
    sys.path.insert(0, path)
    from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
    sys.path.append("/app/qa_test_app/")
    from GB_test.utils.mysql_operating import DB
from locust import User, TaskSet, events, task, between, run_single_user
BROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000  # 超时时刻
TEST_TOPIC = "test_topic"
TEST_VALUE = [16, 3, -26, 4, 0, 36,.......]  # 用来publish的测验数据,仅示意
BYTES_DATA = bytes(i % 256 for i in TEST_VALUE)  # 事务需求转换成 byte 类型后再发送
# 创建行列
client_queue = queue.Queue()
# 衔接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"  
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:
    # 把可用的车辆信息存到行列中去
    client_queue.put(t)
def fire_success(**kwargs):
    """恳求成功时调用"""
    events.request.fire(**kwargs)
def calculate_resp_time(t1, t2):
    """核算呼应时刻"""
    return int((t2 - t1) * 1000)
class MQTTMessage:
    """已发送的音讯实体类"""
    def __init__(self, _type, qos, topic, payload, start_time, timeout):
        self.type = _type,
        self.qos = qos,
        self.topic = topic
        self.payload = payload
        self.start_time = start_time
        self.timeout = timeout
# 核算总共发送成功的音讯数量
total_published = 0
disconnect_record_list = []  # 界说寄存衔接断开的记载的列表容器
class PublishTask(TaskSet):
    @task
    def task_publish(self):
        self.client.loop_start()
        topic = TEST_TOPIC
        payload = BYTES_DATA
        # 记载发送的开端时刻
        start_time = time.time()
        mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
        published_mid = mqtt_msg_info.mid
        # 将发送成功的音讯内容,放入client实例的 published_message 字段
        self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
                                                                   0,
                                                                   topic,
                                                                   payload,
                                                                   start_time,
                                                                   PUBLISH_TIMEOUT)
        # 发送成功回调
        self.client.on_publish = self.on_publish
        # 断开衔接回调
        self.client.on_disconnect = self.on_disconnect
    @staticmethod
    def on_disconnect(client, userdata, rc):
        """ broker衔接断开,放入列表容器"""
        disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
        disconnect_record_list.append(disconnected_info)
        print("rc状况:{} - -".format(rc), "{}-broker衔接已断开".format(str(client._client_id)))
    @staticmethod
    def on_publish(client, userdata, mid):
        if mid:
            # 记载音讯发送成功的时刻
            end_time = time.time()
            # 从已发送的音讯容器中,取出音讯
            message = client.published_message.pop(mid, None)
            # 核算开端发送到发送成功的耗时
            publish_resp_time = calculate_resp_time(message.start_time, end_time)
            fire_success(
                request_type="p_success",
                name="client_id: " + str(client._client_id),
                response_time=publish_resp_time,
                response_length=len(message.payload),
                exception=None,
                context=None
            )
            global total_published
            # 成功发送累加1
            total_published += 1
class MQTTLocustUser(User):
    tasks = [PublishTask]
    wait_time = between(2, 2)
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # 从行列中获取客户端 username 和 client_id
        current_client = client_queue.get()
        self.client = mqtt_client.Client(current_client[1])
        self.client.username_pw_set(current_client[0], PASSWORD)
        # self.client.username_pw_set(current_client[0] + "1", PASSWORD)  # 模仿client衔接报错
        # 界说一个容器,寄存已发送的音讯
        self.client.published_message = {}
    def on_start(self):
        # 设置tls
        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
        self.client.tls_set_context(context)
        self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
        self.client.on_connect = self.on_connect
    def on_stop(self):
        print("publish 成功, 当时已成功发送数量:{}".format(total_published))
        if len(disconnect_record_list) == 0:
            print("无断开衔接的client")
        else:
            # 把断开记载里的信息写入csv
            with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
                writer = csv.writer(csvfile)
                writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
                for i in disconnect_record_list:
                    writer.writerow(i)
            print("断开衔接的client信息已写入csv文件")
    @staticmethod
    def on_connect(client, userdata, flags, rc, props=None):
        if rc == 0:
            print("rc状况:{} - -".format(rc), "{}-衔接broker成功".format(str(client._client_id)))
            fire_success(
                request_type="c_success",
                name='count_connected',
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )
        else:
            print("rc状况:{} - -".format(rc), "{}-衔接broker失利".format(str(client._client_id)))
            fire_success(
                request_type="c_fail",
                name="client_id: " + str(client._client_id),
                response_time=0,
                response_length=0,
                exception=None,
                context=None
            )
if __name__ == '__main__':
    run_single_user(MQTTLocustUser)

2. 代码分析-locust库部分

并发恳求才能仍是运用的locust库的才能。官方只提供了http协议接口的相关类,没直接提供mqtt协议的,但是咱们能够依照官方的规范,自界说相关的类,只要继承UserTaskSet即可。

User

首先是先界说User类,这儿便是用来生成我要用来测验的车辆。

基于Locust实现MQTT协议服务的压测脚本

类初始化的时分,黄色框里,会去行列里取出车辆信息,用来做一些相关的设置。client来源于from paho.mqtt import client as mqtt_client提供的才能,固定用法,依照人家的文档运用就行。

红色框里,是User类的2个重要了解特点:

  • tasks: 这儿界说了生成的用户需求去干哪些工作,也便是对应脚本里的PublishTask类下面界说的内容。
  • wait_time: 用户在履行task时刻隔逗留的时刻,能够是个区间,在里边随机。我这儿意思是每2s发送一次数据到broker。

绿色框里,界说了一个字典容器,用来寄存当时用户已发送成功的音讯内容,因为后边我要取出来把里边相关的数据写到生成的报表中去。

蓝色框里有2个办法,也是locust提供的才能:

  • on_start:当用户开端运转时调用,这儿我做了车辆衔接broker署理的处理,留意这儿需求设置tls,因为服务衔接需求。

    基于Locust实现MQTT协议服务的压测脚本

  • on_stop:当用户完毕运转时调用,这儿我做了一些其他的处理,比方把运转期间断开衔接的车辆信息写到本地csv中。

    基于Locust实现MQTT协议服务的压测脚本

TaskSet

界说好User类,就需求来界说TaskSet类,你得告诉发生出来的用户,要干点啥。

我这依据事务需求,便是让车辆不停的像broker发送数据即可。

基于Locust实现MQTT协议服务的压测脚本

红色部分,同样是paho.mqtt提供的才能,会发动新的线程去履行你界说的工作。

黄色部分,便是做发送数据的操作,而且我能够拿到一些回来,检查源码就能够知道回来的是MQTTMessageInfo类。

基于Locust实现MQTT协议服务的压测脚本

留意回来的2个特点:

  • mid: 回来这个音讯发送的顺序
  • rc: 表明发送的呼应状况,0 便是成功

绿色部分,还记得我在上面的User类中界说了一个容器,在这儿就把发送的音讯相关信息放到容器中去,留着后边运用。

2. 代码分析-paho.mqtt库部分

上面的代码现已用到了不少paho.mqtt的才能,这儿再进行整体梳理下。

  • client.Client():声明一个client
  • client.username_pw_set(): 设置客户端的用户名,暗码
  • client.tls_set_context: 设置ssl模式
  • client.connect(): 衔接署理
  • client.publish:向署理推送音讯

还用到了一些回调函数:

  • on_connect:衔接操作成功时回调
  • on_publish:发布成功时回调
  • on_disconnect:客户端与署理断开衔接时回调

别的还用到了一个事件函数events.request

基于Locust实现MQTT协议服务的压测脚本

当客户端发送恳求时会调用,不管是恳求成功仍是恳求失利;当我需求自界说我的报告内容时,就需求用到这个event

基于Locust实现MQTT协议服务的压测脚本

检查源码,知道里边要传哪些参数,那咱们在调用时分就需求传入对应的参数。

比方我在发送回调函数里调用了该办法。

基于Locust实现MQTT协议服务的压测脚本

所以最终在控制台显示的报告里就有我界说的内容了。

基于Locust实现MQTT协议服务的压测脚本

因为后来在运用中发现,不知道会在什么时分呈现批量断开的状况,所以在on_disconnect回调函数里添加了对应处理,把相关的断开信息记载下来,运转完毕的时分写到本地文件里去。

基于Locust实现MQTT协议服务的压测脚本

后来我主动测验客户端断开的状况测验了下文件的写入成果,功能正常。

基于Locust实现MQTT协议服务的压测脚本

三、小结

后边就开端运转了,在运转过程中,开发重视链路服务的各项指标,这儿就不展开了,事务缠身就并没有过多的去做这个工作,何况也不专业。的确也发现了不少问题,后边逐步优化,再持续测验。

现在安稳运转12h,服务正常,暂时就先告一段落了。后边还有会相关其他功能测验场景,到时就能够针对性的展开共享下了。

别的,这个脚本共享也仅仅仅供参考,现在我这是运用简略,本着能用就行,可能存在一些不合理需求优化的当地,有需求的朋友还请自行查阅相关文档。