最近在忙事务的间隙,穿插着做了些功能测验。
一、背景简介
事务背景大约介绍一下,便是依照国标规定,车辆需求上传一些指定的数据到ZF的指定渠道,一起车辆也会把数据传到企业云端服务上,所以乎就发生了一些功能需求。
现在咱们仅仅先简略的进行了一个功能场景的测验,便是评价现在服务是否能够支撑,预期的最大一起在线车辆上传数据。通过评价,在线车辆数据依照预期的10倍来进行的,而且后边添加持续运转12h检查服务链路的安稳性。
本篇并不是一个严谨的功能测验过程成果共享,主要是共享下关于mqtt协议服务的压测脚本的编写。因为之前我也没触摸过MQTT协议的压测,网上关于相关的压测脚本的内容也比较杂乱,所以记载一下,仅供参考。
捋一下链路就知道需求生成哪些数据(因为服务还未上线运用,所以发生的压测数据后边能够直接整理掉即可。):
- 一些前置数据:比方数据库、缓存里涉及到的车辆数据,通讯秘钥数据等等,这些能够之前写脚本一次性生成即可。
- 车辆上报的数据:车辆上报到云端的数据,是通过一系列加密转码,期间还要设计到解密等,这个通过评价,能够简化其间的某些环境,所以所有的车能够直接发送相同的数据即可。
- 车辆数据:最终便是生成对应的车辆数据,一起在线,依照评价的频率发送数据。
其间第1、2的数据在之前针对性的别离生成即可,第3步的车辆发送数据便是压测脚本要干的工作了。
二、技术选型
这个却是很快,搜索引擎大约搜了一下,内容很少,或者说对我有用的内容很少。有看到jmeter有相关插件的,但是这个方案基本上我都是否决的,一来我不擅长用,而来我觉得用起来肯定会比自己编码要费事的多。
所以就持续编码好了,依然首选python,想到了locust
库,后来看官方文档的时分,看到locust
也针对mqtt
协议拓宽了一些内容。但是我测验下来不太符合我这的需求,也可能当时我用的不对吧,所以就只能自己来从零开端编写了。
搜索中又发现Python
中用于mqtt
协议的库叫paho.mqtt
,支撑衔接署理,音讯的订阅、收发等等,所以最终确认运用:locust
+paho.mqtt
的组合来完成本次的负载脚本。
三、代码编写
1. 脚本代码
暂时没做代码分层,现在场景简略,就直接都放一个模块里了,有点长,先贴上来,后边部分会对脚本的要点内容进行拆解。
脚本现在做了这些工作:
- 从db中查询有用可用的所有测验车辆信息数据
- 依据命令行的输入参数,指定发动的车辆数,以及与broker署理树立衔接的频率
- 树立衔接成功的车辆,就能够依据脚本里指定的频次,来像broker发送数据
- 脚本核算衔接数、恳求数、呼应时刻等信息写到报表中
- 调试遇到车辆会批量断开衔接的状况,添加了当车辆断开衔接时,把断开时刻、车辆信息写到本地csv中,便利第二天来检查分析。
import csv
import datetime
import queue
import os
import sys
import time
import ssl
from paho.mqtt import client as mqtt_client
# 依据不同系统进行路径适配
if os.name == "nt":
path = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
sys.path.insert(0, path)
from GB_test.utils.mysql_operating import DB
elif os.name == "posix":
sys.path.append("/app/qa_test_app/")
from GB_test.utils.mysql_operating import DB
from locust import User, TaskSet, events, task, between, run_single_user
BROKER_ADDRESS = "broker服务地址"
PORT = 1111
PASSWORD = "111111"
PUBLISH_TIMEOUT = 10000 # 超时时刻
TEST_TOPIC = "test_topic"
TEST_VALUE = [16, 3, -26, 4, 0, 36,.......] # 用来publish的测验数据,仅示意
BYTES_DATA = bytes(i % 256 for i in TEST_VALUE) # 事务需求转换成 byte 类型后再发送
# 创建行列
client_queue = queue.Queue()
# 衔接DB,读取车辆数据
db = DB("db_vmd")
select_sql = "select xxxx"
client_list = db.fetch_all(select_sql)
print("车辆数据查询完毕,数据量:{}".format(len(client_list)))
for t in client_list:
# 把可用的车辆信息存到行列中去
client_queue.put(t)
def fire_success(**kwargs):
"""恳求成功时调用"""
events.request.fire(**kwargs)
def calculate_resp_time(t1, t2):
"""核算呼应时刻"""
return int((t2 - t1) * 1000)
class MQTTMessage:
"""已发送的音讯实体类"""
def __init__(self, _type, qos, topic, payload, start_time, timeout):
self.type = _type,
self.qos = qos,
self.topic = topic
self.payload = payload
self.start_time = start_time
self.timeout = timeout
# 核算总共发送成功的音讯数量
total_published = 0
disconnect_record_list = [] # 界说寄存衔接断开的记载的列表容器
class PublishTask(TaskSet):
@task
def task_publish(self):
self.client.loop_start()
topic = TEST_TOPIC
payload = BYTES_DATA
# 记载发送的开端时刻
start_time = time.time()
mqtt_msg_info = self.client.publish(topic, payload, qos=1, retain=False)
published_mid = mqtt_msg_info.mid
# 将发送成功的音讯内容,放入client实例的 published_message 字段
self.client.published_message[published_mid] = MQTTMessage(REQUEST_TYPE,
0,
topic,
payload,
start_time,
PUBLISH_TIMEOUT)
# 发送成功回调
self.client.on_publish = self.on_publish
# 断开衔接回调
self.client.on_disconnect = self.on_disconnect
@staticmethod
def on_disconnect(client, userdata, rc):
""" broker衔接断开,放入列表容器"""
disconnected_info = [str(client._client_id), rc, datetime.datetime.now()]
disconnect_record_list.append(disconnected_info)
print("rc状况:{} - -".format(rc), "{}-broker衔接已断开".format(str(client._client_id)))
@staticmethod
def on_publish(client, userdata, mid):
if mid:
# 记载音讯发送成功的时刻
end_time = time.time()
# 从已发送的音讯容器中,取出音讯
message = client.published_message.pop(mid, None)
# 核算开端发送到发送成功的耗时
publish_resp_time = calculate_resp_time(message.start_time, end_time)
fire_success(
request_type="p_success",
name="client_id: " + str(client._client_id),
response_time=publish_resp_time,
response_length=len(message.payload),
exception=None,
context=None
)
global total_published
# 成功发送累加1
total_published += 1
class MQTTLocustUser(User):
tasks = [PublishTask]
wait_time = between(2, 2)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 从行列中获取客户端 username 和 client_id
current_client = client_queue.get()
self.client = mqtt_client.Client(current_client[1])
self.client.username_pw_set(current_client[0], PASSWORD)
# self.client.username_pw_set(current_client[0] + "1", PASSWORD) # 模仿client衔接报错
# 界说一个容器,寄存已发送的音讯
self.client.published_message = {}
def on_start(self):
# 设置tls
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
self.client.tls_set_context(context)
self.client.connect(host=BROKER_ADDRESS, port=PORT, keepalive=60)
self.client.on_connect = self.on_connect
def on_stop(self):
print("publish 成功, 当时已成功发送数量:{}".format(total_published))
if len(disconnect_record_list) == 0:
print("无断开衔接的client")
else:
# 把断开记载里的信息写入csv
with open("disconnect_record.csv", "w", newline='', encoding='UTF8') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['client_id', 'rc_status', 'disconnected_time'])
for i in disconnect_record_list:
writer.writerow(i)
print("断开衔接的client信息已写入csv文件")
@staticmethod
def on_connect(client, userdata, flags, rc, props=None):
if rc == 0:
print("rc状况:{} - -".format(rc), "{}-衔接broker成功".format(str(client._client_id)))
fire_success(
request_type="c_success",
name='count_connected',
response_time=0,
response_length=0,
exception=None,
context=None
)
else:
print("rc状况:{} - -".format(rc), "{}-衔接broker失利".format(str(client._client_id)))
fire_success(
request_type="c_fail",
name="client_id: " + str(client._client_id),
response_time=0,
response_length=0,
exception=None,
context=None
)
if __name__ == '__main__':
run_single_user(MQTTLocustUser)
2. 代码分析-locust库部分
并发恳求才能仍是运用的locust
库的才能。官方只提供了http
协议接口的相关类,没直接提供mqtt
协议的,但是咱们能够依照官方的规范,自界说相关的类,只要继承User
和TaskSet
即可。
User类
首先是先界说User
类,这儿便是用来生成我要用来测验的车辆。
类初始化的时分,黄色框里,会去行列里取出车辆信息,用来做一些相关的设置。client
来源于from paho.mqtt import client as mqtt_client
提供的才能,固定用法,依照人家的文档运用就行。
红色框里,是User
类的2个重要了解特点:
-
tasks
: 这儿界说了生成的用户需求去干哪些工作,也便是对应脚本里的PublishTask
类下面界说的内容。 -
wait_time
: 用户在履行task时刻隔逗留的时刻,能够是个区间,在里边随机。我这儿意思是每2s发送一次数据到broker。
绿色框里,界说了一个字典容器,用来寄存当时用户已发送成功的音讯内容,因为后边我要取出来把里边相关的数据写到生成的报表中去。
蓝色框里有2个办法,也是locust
提供的才能:
-
on_start
:当用户开端运转时调用,这儿我做了车辆衔接broker署理的处理,留意这儿需求设置tls,因为服务衔接需求。 -
on_stop
:当用户完毕运转时调用,这儿我做了一些其他的处理,比方把运转期间断开衔接的车辆信息写到本地csv中。
TaskSet类
界说好User
类,就需求来界说TaskSet
类,你得告诉发生出来的用户,要干点啥。
我这依据事务需求,便是让车辆不停的像broker发送数据即可。
红色部分,同样是paho.mqtt
提供的才能,会发动新的线程去履行你界说的工作。
黄色部分,便是做发送数据的操作,而且我能够拿到一些回来,检查源码就能够知道回来的是MQTTMessageInfo
类。
留意回来的2个特点:
-
mid
: 回来这个音讯发送的顺序 -
rc
: 表明发送的呼应状况,0 便是成功
绿色部分,还记得我在上面的User
类中界说了一个容器,在这儿就把发送的音讯相关信息放到容器中去,留着后边运用。
2. 代码分析-paho.mqtt库部分
上面的代码现已用到了不少paho.mqtt
的才能,这儿再进行整体梳理下。
- client.Client():声明一个client
- client.username_pw_set(): 设置客户端的用户名,暗码
- client.tls_set_context: 设置ssl模式
- client.connect(): 衔接署理
- client.publish:向署理推送音讯
还用到了一些回调函数:
- on_connect:衔接操作成功时回调
- on_publish:发布成功时回调
- on_disconnect:客户端与署理断开衔接时回调
别的还用到了一个事件函数events.request
。
当客户端发送恳求时会调用,不管是恳求成功仍是恳求失利;当我需求自界说我的报告内容时,就需求用到这个event
。
检查源码,知道里边要传哪些参数,那咱们在调用时分就需求传入对应的参数。
比方我在发送回调函数里调用了该办法。
所以最终在控制台显示的报告里就有我界说的内容了。
因为后来在运用中发现,不知道会在什么时分呈现批量断开的状况,所以在on_disconnect
回调函数里添加了对应处理,把相关的断开信息记载下来,运转完毕的时分写到本地文件里去。
后来我主动测验客户端断开的状况测验了下文件的写入成果,功能正常。
三、小结
后边就开端运转了,在运转过程中,开发重视链路服务的各项指标,这儿就不展开了,事务缠身就并没有过多的去做这个工作,何况也不专业。的确也发现了不少问题,后边逐步优化,再持续测验。
现在安稳运转12h,服务正常,暂时就先告一段落了。后边还有会相关其他功能测验场景,到时就能够针对性的展开共享下了。
别的,这个脚本共享也仅仅仅供参考,现在我这是运用简略,本着能用就行,可能存在一些不合理需求优化的当地,有需求的朋友还请自行查阅相关文档。