api 接口测试工具:Postman、Apifox、Apipost、api压测(locust)、locust爬虫
Postman:支持离线使用,未登录状态下,以及内网环境下,都可以正常进行大部分操作
Apipost:支持离线使用,未登录状态下,以及内网环境下,都可以正常进行大部分操作。
apifox:不支持离线,而且不登录没法用。
1、Postman
From:https://zhuanlan.zhihu.com/p/534078123
Postman V9.16 绿色版汉化:https://www.cr173.com/soft/1497202.html
一、postman 简介
Postman 是一款功能强大的网页调试与发送网页HTTP请求的工具。有 Chrome 插件版本,也有Postman 本地应用程序版本,插件版本早已停止更新。详细了解为什么支持 Postman Chrome 应用程序已被弃用?:http://chromecj.com/web-development/2018-04/1376.html
postman 的特点
- postman 只做 http协议 的接口的测试,是一种最广泛 REST 接口测试客户端软件。
- postman 支持 http 协议的所有请求方式,包括 get、post、head、put、delete 等。
- postman 支持各种额外的头部字段的添加。
- postman 除了可以模拟普通表单数据外,还支持文件、图片、视频等数据请求。
- postman 是一个接口的开发和测试的全流程支持软件。
- 支持前端开发:mock(模拟) 测试
- 支持后端开发:接口测试、开发接口文档
- 支持测试:接口测试
- 支持运维:监控功能
- postman 支持云服务:随时随地都能无缝对接加班。
- 数据同步功能,家里、办公室的电脑登录同一账号,数据即可同步。
- 团队协作,你设计的请求可以团队内的推送,交给其他人执行或继续开发。
安装 postman
官网下载 postman:https://www.postman.com/downloads/
根据自己使用系统,下载对应的平台版本,默认安装即可。
注册和登录
也可以不注册,但是有些功能不能用
第一个接口测试
百度翻译接口的实现:是一个get类型的请求:https://fanyi-api.baidu.com/api/trans/vip/translate?q=apple&from=auto&to=zh&appid=&xxxxsalt=888888&sign=a9adc2d687fbacecc3b5059b9ccedc95
1)创建一个工程目录
如果没有特别的要求,只需要创建一次即可。
2)在工程目录下创建一个 collection 集合
collection 是 postman 管理接口请求的基本单位,首先就是把他创建出来。
3)创建一个接口请求(接口用例)
新建请求,重命名为baiduTest01:
4)拼装一个接口请求参数
对于一个get请求来说,需要三部分内容:
请求地址:https://fanyi-api.baidu.com/api/trans/vip/translate
请求方式:get
请求参数(params)
5) 断言
后续细讲。
6)发送请求
点击url地址栏后面的send按钮。
老版本 postman 界面
二、接口 测试的流程
获取请求的基本参数
- 做接口测试,基本上就是手动打包 http 请求报文,你要知道请求报文到底有哪些内容。
- 接口的 url 地址:找到接口所在的服务器及资源(一个文件夹或者一个文件、接口)
- 接口的请求方式:get、post等
- 必须的请求头部:(content-type、referer、cookie等)
- 请求参数(querry string parameters):可以单独存放,也可以拼接在url地址后面
- 请求的正文数据
怎么获取这些个参数:
1)有专门的接口文档,通过这个文档就能获取上面的参数
2)通过抓包工具获取(浏览器、fiddler等)
设计测试用例
按照指定的用例模板,依据接口的参数,采用等价类、边界值、参数组合(有的参数必选、可选等)形参请求数据,整理响应其他参数(url、请求方式等)和预期结果(断言),形成测试用例。
通过 postman 拼接请求
根据测试用例,将postman中的请求拼接出来,并send发送,查看结果。
创建collection-->request(请求,或者接口用例)
断言
通过断言能够自动判断业务逻辑是否正确,一般可以采用对响应的状态码、响应的正文进行判断,还可以采用响应头部的一些字段来断言。在接口测试中,断言也是必须的,没有断言的话,只是把请求发出去,不知道处理的对不对(接口实功能实现对不对)。
三、使用 postman 进行 HTTP 接口测试
get 请求
get请求不需要有请求的正文数据的,其他都要(url和请求方式必须,可选的是头部字段)。
1)百度翻译接口
自己再去做一遍即可。
请求地址:url:https://fanyi-api.baidu.com/api/trans/vip/translate
请求方式:get
params:q=apple&from=auto&to=zh&appid=xxxxx&salt=888888&sign=a9adc2d687fbacecc3b5059b9ccedc95
2)B站视频最新评论接口
url:https://api.bilibili.com/x/v2/reply/main
请求方式:get
params: callback=jQuery17207775567550413909_1655259544202&jsonp=jsonp&next=0&type=1&oid=248143527&mode=2&plat=1&_=1655259574655
在postman中实现上述的参数,形成参数用例:
视频类的网站都有一个防盗链功能,就算抓到了请求的基本参数,你也不能通过工具获取评论的参数,其实是通过一个头部字段进行的限制,这个字段就起到了防盗链的作用。
referer:一般只是当前视频所在的地址,用它做防盗链的作用,必须指定referer,而且referer的值和当前视频的地址一致,你才有权限获取评论信息。
至于使用哪一个字段来做防盗链,开发设计的,只不过使用referer的居多。
解决方法:
postman的header标签下,添加一个头字段referer(可以抓包获取)。
post 请求
除了 get 所需的所有参数(请求方式、请求地址、请求头部等),还需要请求正文数据。
1)百度翻译
使用post请求实现百度翻译功能,content-type一定是x-www-form-urlencoded
https://fanyi-api.baidu.com/api/trans/vip/translate?q=apple&from=auto&to=zh&appid=xxxxxx&salt=888888&sign=a9adc2d687fbacecc3b5059b9ccedc95
请求地址:https://fanyi-api.baidu.com/api/trans/vip/translate
请求类型:post
请求正文:
- q:apple
- from:auto
- to:zh
- appid:xxxxx
- salt:888888
- sign:a9adc2d687fbacecc3b5059b9ccedc95
- 请求头部:content-type:x-www-form-urlencoded
2) 电商前台的注册接口
特点是post请求,content-type要求是以x-www-form-urlencoded。
请求的地址:http://xxxxx/qftest/index.php?c=user&a=register&step=submit
请求的方式:post
请求的参数:拼接在地址栏了
请求的数据:
- username: 给一个已注册的数据、空、4位、数字开头、正常未注册等
- password
- repassword
- agree
请求的头部:content-type:application/x-www-form-urlencoded
3)电商登录接口
请求地址:http://xxxxx/qftest/user/login.html?step=submit
请求方式: post
请求正文:
- username:bk2201_00001
- password:200c6d94e583e62c6964de3acdc723e5
请求头部:content-type:application/x-www-form-urlencoded
post 请求体
content-type 类型位form-data,数据传输仍然是键值对,数据类型可以是文件(word、excel、图片、视频等)。
1)蜜锋OA系统登录功能--urlencoded
请求url:http://xxxxx/MiFengOA/index.php?a=check&m=login&d=&ajaxbool=true&rnd=607798
请求方式:post
请求的数据:
请求头部:content-type: x-www-form-urlencoded
2)蜜锋OA系统上传图片的功能--form-data
只有登录成功之后才能够上传图片,登录失败(没有登录)不能上传。
请求地址:http://xxxxx/MiFengOA/index.php?a=upfile&m=upload&d=public&maxsize=80&uptype=image&thumbnail=150x150&ajaxbool=true&rnd=322198
请求方式:post
请求数据:file: (binary)
请求头部:content-type:multipart/form-data
3) 多接口的实现-cookie的使用
cookie作为一种鉴权的方式,登录某个系统之后,再次访问系统的不同页面,都能保持登录状态,就是因为后续的所有请求都携带了cookie的参数。
第一次登录成功,postman或者浏览器,可以将登录所用的用户名及密码等记录在本地。
再在访问其他接口的时候,浏览器发出的请求就会自动化从cookie管理器中携带和当前主机相关的cookie及值,这样就一直保持了登录状态。
在postman中的两个请求,登录和上传签名。
如果希望通过postman的cookie管理器,来共享cookie值,就可以调整他俩的现后执行顺序即可,先登录再上传即可,每次都能动态获取最新的cookie值,一般不会出错。
如果不想使用cookie管理器,不想先登录怎么办,只能手动给上传接口添加一个cookie参数,应为cookie是有时效性的,过一段时间就不能用了。
post 请求头
json 数据结构作为请求的正文数据及响应正文数据是最常见的用法,将来大家接触的80%都是这种格式的。
将来项目中 json 数据会更复杂、内容会更多,上百行数据都很正常。
住逻辑的登录的接口来演示。
请求地址:https://xxx/designer_api/account/login_quick
请求的方式:post
请求的数据:
{"phone":"xxx","code":"123456","messageType":3,"key":"a2088d42-2eb0-4194-aada-e3a0019ed5f1","registration_type":1,"channel":"zhulogic","unionid":""}
请求的头部
Content-Type:application/json;charset=UTF-8
四、变量的使用
环境变量
主要用于环境迁移。
postman中支持两种环境,一种是全局的环境变量、另一种是局部的环境变量。
- 全局的环境变量:只有一组,就是global环境,所有的集合和请求可以共享这个环境的变量。
- 局部的环境变量:environment环境,可以设置多组,需要指定给集合或者请求才能使用。
场景:假设我们一个系统有500条接口请求,突然有一天服务器的地址更换了(测试环境迁移到预发布环境中去),这时候需要把500个请求中的主机名部分更换一遍。
而全局环境及局部环境的变量就可以解决这种耗时的任务。
1)globals全局环境设置
2)environment局部环境设置
集合 collection 变量
绑定在集合上的变量,只能给集合下的请求使用。
然后在该集合中就可以直接使用{{password}}来参数化数据了。
注意:不能跨集合使用。
五、预处理、断言
这两个模块采用的是js语法脚本。
预处理 --- pre-request script 模块
在当前请求发送之前要处理的脚本,我们能做点什么事情?
用于处理请求数据(获取、设置、加密等)
在发送当前请求之前发送另外一个请求(OA的登录和上传图片)。
1)发送一个 get 请求
2)获取参数(全局环境、局部环境、集合变量)
3)修改、设置参数(掌握)
4)发送一个post类型的请求
//发一次登录请求??
//是在OALoadImage前要处理的脚本
//实现发送一个post请求
var postInfo = {
"url":"http://xxxx/MiFengOA/index.php?a=check&m=login&d=&ajaxbool=true&rnd=607798",
"method":"post",
"body":{
"mode":"urlencoded",
"urlencoded":"rempass=0&jmpass=false&device=1650464000397<ype=0&adminuser=YWRtaW4:&adminpass=YTg5ODM5ODM:&yanzm="
}
}
pm.sendRequest(postInfo, function (err, response) {
//如果响应正文是json格式的,就可以response.json()输出响应正文
//如果响应正文是不是json格式的,就输出字符粗格式:response.text()
console.log(response.json());
});
断言 - tests
在当前请求发送之后要处理的脚本,是作为断言来使用的。
发送完当前请求之后,对响应的结果进行判断、断言
断言的内容可以是:响应正文、响应状态码、响应头部的字段呢?响应时间等
响应正文断言:包含子字符串(掌握)和json断言(掌握)
其他断言方式了解:响应状态码断言、响应时间断言等
//实现断言,是在当前请求发送完成之后,得到响应结果才能进行的。
//1、判断响应状态码是否符合预期,并不能完全确定业务是否是正确的
pm.test("响应状态码是200?", function () {
//实现判断语法
//pm.response: http的响应报文(四大组成部分)
// to.have.status(200):是否包含状态码200呢?
pm.response.to.have.status(200);
});
//了解就行
tests["响应状态码是否为200?"] = responseCode.code === 200;
// 2、响应正文做断言(重点掌握)
// 以字符串格式的正文形式断言(都行)
// 大串(实际结果)包小串(预期结果)。
// pm.response.text():将响应报文中的正文部分转化为字符串格式
pm.test("大串包小串??", function () {
pm.expect(pm.response.text()).to.include("用户名不符合格式要求");
});
//responseBody :获取的是响应正文
tests["响应正文包括指定字符串?"] = responseBody.has("用户名不符合格式要求");
// 以json格式的正文进行i断言(响应报文头部的content-type为json的可以用)
//直接使用json断言
pm.test("Your test name", function () {
var jsonData = pm.response.json();
console.log(jsonData.trans_result[0].dst)
pm.expect(jsonData.trans_result[0].dst).to.eql("苹果");
});
//响应时间:从发出请求,到接收到响应结果的时间差就是响应时间,是接口的一个性能指标
// 假设要求,该请求响应时间不应该高于200ms
pm.test("响应时间的判断:", function () {
//pm.response.responseTime:实际的响应时间
// to.be.below:低于某个指定的值
pm.expect(pm.response.responseTime).to.be.below(400);
});
六、Runner 运行器的用法
runner 是 postman 中执行 collection 集合中请求的一种用法,可以调整执行的顺序和用例的数量。可以记录执行结果及导出结果报告(json格式的报告)。
运行测试集合
选择一个测试集合,启动运行器。
按照如下设置,点击运行:
执行结果:
参数化(数据驱动测试)
1)json文件数据驱动
创建json文件,并设置数据:
使用文件中的键名参数化postman正文数据值。
断言也需要进行参数化:
使用Runner运行器,导入、并查看数据文件
因为有四条数据,迭代次数就默认给设置成了4次。
它会每次迭代从文件中读取一行数据进行参数化,并允许。
直到四次迭代结束,数据使用完毕。
做好设置,保存响应结果,点击运行集合。
2)csv文件数据驱动
后续操作过程见json数据驱动过程。
七、newman 插件的使用
newman是postman的插件,是用于命令行运行测试集合的一个插件。
安装
先安装nodejs,通过npm -v验证
最好安装16版本以上。
再安装Newman,npm install -g newman,也是通过newman -v来验证。
指定版本安装格式: npm install -g newman@5.2.4
如果出现安装进度慢,默认镜像源(软件所在的服务器)在国外,可以更新到国内的镜像源服务器上去。
npm config set registry http://registry.npm.taobao.org
使用newman运行collection
前提准备:
- 测试集合文件,是通过postman导出的json文件。
- 环境变量文件,是需要通过postman导出的json文件。
- 数据驱动文件:data.json、data.csv
1) 只运行一个collection集合(不涉及到环境变量、不涉及参数化)
导出集合文件为:zhuluoji_collection.json
就可以使用newman运行这个集合文件了:
格式: newman run 集合文件的全路径
2) 指定迭代次数 : -n
newman run e:\zhuluoji_collection.json -n 2
3) 指定局部环境变量:-e
导出environment环境变量文件。
newman run e:\zhuluoji_collection.json -e e:\BaiDuTrans_environment.json -n 2
4) 指定全局环境变量:-g
导出globals环境变量文件:
newman run e:\BaiDuTrans_collection.json -e e:\BaiDuTrans_environment.json -g e:\globals.json -n 1
5) 指定参数化文件的:-d
可以支持json和csv文件进行参数化的。
newman run e:\MiFeng_collection.json -d e:\data.json
newman run e:\MiFeng_collection.json -d e:\data.csv
生成报告
1) cli过格式报告
在cmd下运行的结果报告的展示形似。
2) json格式报告
-r json : 指定输出报告的格式是json格式。
newman run e:\MiFeng_collection.json -d e:\data.csv -r json --reporter-json-export e:\data\result1212.json
3) html格式报告
-r html : 指定输出报告的格式为html格式
newman run e:\MiFeng_collection.json -d e:\data.csv -r html --reporter-html-export e:\data\result1212.html
但是html格式输出,需要安装插件才能使用。
npm install -g newman-reporter-html
4) 集成命令到批处理文件中(bat文件)
创建一个txt文件,重命名为run.bat文件,将其编码改为utf-8,将上面可执行的newman命令复制到该文件即可。
保存后,双击该文件运行。
八、newman+jenkins集成做定时任务
配置jenkins支持newman的环境变量
需要配置newman和node的路径,通过where命令来获取。
填如下面的path变量中。
创建自由风格的项目
配置项目
1)定时任务
2)创建windows patch command构建
保存、退出。
运行任务
1)手动运行
2)定时运行
根据设置的触发时间,自动运行脚本。
九、Mock 挡板测试
创建挡板服务
配置相关参数
指定环境变量,运行挡板测试
十、监控测试
系统上线了,客户使用的时候有个接口失效了,客户会反馈问题给运维、运维会提交给测试、测试提交给开发,开发定位、修复这个问题,走这么一圈,可能三四天时间过去了,反馈的效率太低,影响会很大。
诉求:反馈效率要高,影响降低到最小。
解决:通过实时监控接口的方式,每隔固定时间给接口发送请求,通过返回的响应结果确定接口的正确性,如果接口断言失败,则直接发送邮件给指定的人。
postman中如何实现:
配置监控
运行监控
监控过程
邮件提醒
十一:关联技术
关联技术:解决多个接口之间,数据交互的问题(比如第一个接口响应数据要给第二个接口作为参数使用),我们动态提前第一个接口响应数据,参数化第二个接口的过程就是关联。
电商后台首页接口
请求地址:http://xxxx/qftest/index.php?m=backend&c=main&a=index
请求方式:get
电商登录接口
请求地址:http://xxxxxx/qftest/index.php?m=backend&c=main&a=login
请求方式:post
请求头部:Content-Type: application/x-www-form-urlencoded
请求正文:
- M98v8: 848750613
- username: xxxx
- password: 091bfa87c505bba664b431baf83cbc19
商品的删除
通过商品的id删除商品
请求地址:http://xxxx/qftest/index.php?m=backend&c=goods&a=delete&id=212
请求方式:get
商品的添加
Request URL: http://xxxxxx/qftest/index.php?m=backend&c=goods&a=add&step=submit
Request Method:POST
请求头部:Content-Type: multipart/form-data
请求数据:
- goods_name: xiaomi12pro
- cate_id: 59
- brand_id: 1
- goods_sn: 010100112
- now_price: 1999
- original_price: 2999
- newarrival: 1
- status: 1
- goods_image:
- stock_qty: 9999
- goods_weight: 0.00
- meta_keywords:
- meta_description:
- goods_brief: <p>aa</p>
十二、token 技术使用
通过关联技术获取验证码接口提供的token,并在后续的请求中带上token值,才能访问到服务器端的数据。
获取验证码的接口
Request URL: http://xxxxxx/student/api/capchaRestController/captcha
Request Method: POST
登录接口
Request URL: http://zxxxxx/student/api/login
Request Method: POST
Request Header:Content-Type: application/json
Requests Body:
{
"mobile":"xxxxx",
"password":"123456",
"imgCode":"00635",
"imgToken":"9be7d1a7-86ef-4f96-bbd0-97dfb11dbf6e"
}
登录后的其他操作-学习中心
Request URL: http://xxxx/student/api/line/list/24
Request Method: POST
Request Header:Authorization:
eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJRRkVEVV9Qc1lEMkdkM3lvMFg2bGttQXJyTnRBPT0iLCJleHAiOjE2NTYxMzM1ODV9.GaMxcrj6uYfyFhWYQKJCNqcIRrLCM9YnJXA0mX2_5es
Content-Type:application/json
runner运行器测试执行
使用runner运行器,按照业务流程顺序执行接口,便可实现业务流程。
到此,allen老师的又一篇万字长文就暂告段落,后续postman的其他功能也会持续更新在这篇文章里,大家可以收藏该文章,持续学习。
2、Apifox 快速入门
入门
接口文档
接口调试
环境 & 变量
前后置操作 & 脚本
自动化测试
接口 Mock ( 模拟 )
自动生成代码
:https://apifox.com/help/code-generation
导入 / 导出接口
团队 & 项目
账号 & 软件设置
应用与插件
最佳实践
WebSocket 接口
WebService 接口
Socket 接口
gRPC 接口
Dubbo 接口
参考资料
常见问题
1. Apifox 是否收费?
Apifox 公网版 (SaaS 版) 免费,私有化部署版收费。
2. 登录(Auth)态如何实现?
请参考文档:登录态(Auth)如何处理。
3. 接口发送请求前需要调用登录接口获取 token 放在 header,如何实现?
请参考文档:登录态(Auth)如何处理。
4. B 接口请求参数依赖于 A 接口返回的数据,如何实现?
请参考文档:接口之间如何传递数据。
5. 同项目下有不同域名的接口,如何处理?
方案一:在环境里新增多个服务,分别设置不同的前置 URL ,接口分组和接口维度可以指定对应的前置 URL。推荐本方案!
方案二:把域名设置成环境变量如
DOMAIN_1
,接口路径这样填写:https://{{DOMAIN_1}}/users
。接口路径是以http://
或https://
起始的,系统会自动忽略里环境里前置 URL。方案三:给不同域名接口设置不同环境,通过切换环境来运行不同域名下的接口。不推荐本方案!
6. 脚本如何读取或修改接口请求信息?
请参考文档: 脚本读取/修改接口请求信息
7. 是否支持查询数据库字段作为参数传给接口?
支持,请参考文档:数据库操作
8. 数据是存储在本地还是云端?可否离线使用?可否私有化部署?
目前 Apifox 有
Saas 版
和私有化部署版
。
Saas 版
是免费的,数据都是存在云端的,需要联网才能使用。
私有化部署版
是收费的,数据存在使用者企业内部,不连外网也可以使用。注意
环境变量/全局变量里的 本地值 仅存放在本地,不会同步到云端,团队成员之间也不会相互同步,适合存放
token
、账号
、密码
之类的敏感数据。
9. 使用 Postman 调用接口返回正常,而 Apifox 返回错误
解决方法:对比 postman 和 apifox 实际发出的请求内容(url、参数、body、header)是否完全一样。
查看实际请求内容方法:
- Apifox:返回内容下的
实际请求
tab 里查看- Postman:点击底部状态栏里的
Console
查看
10. 为什么修改了环境变量(或全局变量)值,而引用的地方没有生效?
- 请检查
环境变量
、全局变量
、临时变量
里是不是有多个地方定义了相同名称
的变量,如果有,系统会根据优先级来取值。优先级顺序如下:临时变量
>环境变量
>全局变量
。- 请检查修改的是否是
本地值
,环境变量(或全局变量)仅读取本地值
,而不会读取远程值
。
11. Web 端与客户端有何区别?
Web 端与客户端在主要流程的使用上没有明显差异,都能够满足团队内的接口协作需求,但以下功能存在差异。
以下截图均为 Web 端截图。
导出接口
Web 端:❌ 客户端:✅
Agent 服务
Web 端:✅ 客户端:❌
本地 Mock 功能
Web 端:❌ 客户端:✅
生成业务代码
Web 端:❌ 客户端:✅
外部程序
Web 端:❌ 客户端:✅
调整字体大小
Web 端:❌ 客户端:✅
网络代理
Web 端:❌ 客户端:✅
12. Web 端与客户端数据不同步如何处理?
若发现客户端中某个项目的接口数据与 Web 端不一致,那么有可能是因为两端数据未同步。你可以尝试以下两种方法解决:
- 退出 Apifox 客户端后重新运行。
- 进入 Apifox 客户端中的项目后,点击右上角的“刷新”按钮。
3、Apipost
官网:https://www.apipost.cn/
官网文档:https://v7-wiki.apipost.cn/docs/3/
4、api 压测 工具
web 压测 工具
- JMeter:一个广泛使用的开源压力测试工具,可用于测试Web应用程序的性能,包括APIs。
- PerformanceRunner:泽众PerformanceRunner(简称PR)是国内专业的支持http、https、websocket、tcp/ip、MQ等各种协议、10万+海量并发、可靠的性能测试工具/压力测试工具,降低了应用系统运行风险。
- Gatling:这是另一个开源压力测试工具,使用Scala编写,可用于测试Web应用程序和APIs的性能。
- LoadRunner:这是一种商业压力测试工具,可测试多种协议,包括Web、API等
- Postman:这是一个流行的API开发工具,它还具有测试和监视API性能的功能。
- Apache Bench:这是一个简单但功能强大的工具,可用于测试Web应用程序和APIs的性能。
- Siege:这是另一个免费的压力测试工具,可用于测试Web应用程序和APIs的性能。
Jmeter 基于多线程,Locust 基于协程。 Locust 默认的 HttpSession 客户端性能有点低,做压测还是建议使用 FastHttpLocust 客户端,但是 Locust 官网也提到了,FastHttpLocust 并不能完全替代 HttpSession,这个还得取决于测试场景
发压能力:相同并发下,Locust(使用FastHttpLocust)> Jmeter
并发能力:Locust和Jmeter旗鼓相当,都能满足工作需求,Jmeter 消耗的内存更高
如果只是做简单的接口测试、压力测试,没有需要写代码来扩展的特殊需求,首选 Jmeter;
如果某些测试场景需要写代码来扩展,你会 Java 的话,可以选择Jmeter;
如果某些测试场景需要写代码来扩展,你会 Python 的话,可以选择 Locust;
如果想在单台机器发起更大的压力的话,并且 Python 代码能力不错的话,可以选择 Locust,记得一定要使用 FastHttpLocust 客户端
蝗虫 (LOCUST)
官网:https://www.locust.io/
官网文档:https://docs.locust.io/en/latest/
github:https://github.com/locustio/locust
蝗虫测试本质上只是一个 Python 程序,向要测试的系统发出请求。来进行百万长连接性能测试。Locust 基于 gevent 使用协程机制,避免了系统资源调度,由此可以大幅度提高单机的并发性能。
Locust 是使用 python 开发的,自带一个Web UI,用于定义用户模型,发起测试,实时测试数据,错误统计等。
使用类 linux 平台时请一定要修改最大文件打开数量。 可以使用 ulimit -n 查看当前支持的文件句柄,并用 ulimit -n xxxx 来进行修改。ulimit -n 65535
API
安装 Locust
安装:pip install locust
locust --help
用法: locust [options] [UserClass ...]
命令选项:
-h, --help 帮助
-f <filename>, --locustfile <filename> py脚本文件
--config <filename> 配置文件
-H <base url>, --host <base url> 要测试的URL地址
-u <int>, --users <int> Locust并发用户的数量
-r <float>, --spawn-rate <float> 生成用户的速率(每秒)
-t <time string>, --run-time <time string> 运行多长时间(300s, 20m, 3h, 1h30m), 默认一直运行
-l, --list 列出可用的 User classes 并退出。示例: locust -f my_py.py -l
Web UI 选项:
--web-host <ip> 运行web绑定的网卡地址. 默认所有。
--web-port <port number>, -P <port number> 运行web的绑定端口
--headless 无头模式,关闭web界面,立即启动测试。使用-u和-t来控制用户数量和运行时间
--autostart 立刻开始测试 (跟 --headless 很像, 但是不会禁用 web UI)
--autoquit <seconds> 在运行结束后X秒完全退出蝗虫。只能与 --autostart 一起使用。
默认情况下,Locust将一直运行,直到您使用CTRL+C关闭它
--web-auth <username:password> 基本验证。格式:username:password
--tls-cert <filename> 用于通过HTTPS提供服务的TLS证书的可选路径
--tls-key <filename> 用于通过HTTPS提供服务的TLS私钥的可选路径
--class-picker 启用web界面中的选择框,选择 "用户类"
--modern-ui 使用新的基于react的web UI前端
Master options: 主节点选项
worker节点连接到master节点后,master才能进行负载测试
--master 启动 master 节点,等待 worker 进行连接
--master-bind-host <ip> master节点绑定的网卡ip,默认绑定所有网卡。
--master-bind-port <port number> master监听的端口,默认 5557
--expect-workers <int> 延迟测试,至到指定数量的worker连接成功后才进行测试
--expect-workers-max-wait <int> 等待时间。
Worker options: 从节点 选项
--worker 设置为 worker 节点
--processes <int> worker的进程数
--master-host <hostname> master节点的ip. 默认 127.0.0.1.
--master-port <port number> master节点的端口. 默认 5557.
Tag 选项:
可以使用@tag装饰器标记蝗虫任务。这些选项允许指定在测试期间包含或排除哪些任务。
-T [<tag> ...], --tags [<tag> ...] 列出包含的测试
-E [<tag> ...], --exclude-tags [<tag> ...] 列出排除的测试
Request statistics options:
--csv <filename> 以CSV格式存储请求统计信息到文件。
--csv-full-history
--print-stats 启用在UI运行中定期打印请求状态
--only-summary 禁用在 --headless 运行期间定期打印请求统计信息
--reset-stats 一旦 spawning 完成就重置状态
--html <filename> 将HTML报告存储到指定的文件路径
--json 将最终统计数据以JSON格式打印到stdout。
Logging options:
--skip-log-setup 禁用蝗虫的日志设置。使用由Locust测试或者Python提供的。
--loglevel <level>, -L <level> DEBUG/INFO/WARNING/ERROR/CRITICAL. Default is INFO
--logfile <filename> log文件路径
其他 options:
--show-task-ratio 打印用户类的任务执行比率表。
--show-task-ratio-json 打印User类任务执行率的json数据。
--version, -V
--exit-code-on-error <int> 设置要在测试结果包含任何失败或错误时使用的进程退出代码。默认为1
-s <number>, --stop-timeout <number>
--equal-weights 使用均匀分布的任务权重,覆盖locustfile中指定的权重。
--enable-rebalancing 允许在测试运行期间添加或删除新工作者时自动重新平衡用户。
User classes:
<UserClass1 UserClass2> 在命令行末尾,您可以列出要使用的用户类(可用的用户类)
可以用——list)列出。LOCUST_USER_CLASSES环境变量也可用于
指定用户类。默认是使用所有可用的User类
示例:
locust -f my_test.py -H https://www.example.com
locust --headless -u 100 -t 20m --processes 4 MyHttpUser AnotherUser
编写 web 服务
首先使用 fastapi 启动一个 web 服务:
import fastapi
import uvicorn
from pathlib import Path
from fastapi import Request
app_main = fastapi.FastAPI()
@app_main.get("/hello")
@app_main.post("/hello")
async def func(request: Request):
ret_val = {"response": "测试 hello 请求"}
return ret_val
@app_main.get("/world")
@app_main.post("/world")
async def func(request: Request):
ret_val = {"response": "测试 world 请求"}
return ret_val
def http_server():
uvicorn.run(f'{Path(__file__).stem}:app_main', host="0.0.0.0", port=9000)
pass
if __name__ == '__main__':
http_server()
pass
使用 locust 测试 并发数
my_test.py
from locust import HttpUser, task
class HelloWorldUser(HttpUser):
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
执行命令:locust -f my_test.py --modern-ui
访问 http://127.0.0.1:8089 进行测试前的配置。
- 第1个"Number of total users to simulate" 填写的是 总共将运行的用户数;默认1就可以。
- 第2个 "Hatch rate"每秒加载的用户数;默认1就可以。
- 第3个 "Host",被测接口的域名或ip端口地址(带http://)
配置完成后,点击 start swarm 开始进行测试,locust 就会不停的向测试服务发送请求,就可以测出每条中并发数了
因为高级参数里面没有配置测试时间, 所以会一直向 /hello
发出 /world
HTTP 请求,手动点击 stop 停止发送请求。
Python 直接运行
可以直接执行 Python 代码启动负载测试,而不是使用命令 locust
。
首先 创建一个 Environment
实例:
from locust.env import Environment
env = Environment(user_classes=[MyTestUser])
Environment
的实例方法 create_local_runner、create_master_runner 可以用来启动一个 Runner实例,Runner实例可以用来启动一个负载测试:
from locust import HttpUser, task
class HelloWorldUser(HttpUser):
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
if __name__ == "__main__":
from locust.env import Environment
env = Environment(user_classes=[HelloWorldUser], host="http://127.0.0.1:9000")
env.create_local_runner()
env.runner.start(5000, spawn_rate=20)
env.runner.greenlet.join()
也可以绕过调度和分发逻辑,手动控制生成的用户:
new_users = env.runner.spawn_users({MyUserClass.__name__: 2})
new_users[1].my_custom_token = "custom-token-2"
new_users[0].my_custom_token = "custom-token-1"
上面的示例仅适用于独立/本地运行程序模式,并且是一个实验性功能。更常见/更好的方法是使用init 或 test_start 事件钩子来获取/创建令牌列表,并使用on_start和on_stop方法从该列表中读取并将它们设置在您的单个User实例上。
虽然可以通过这种方式 ( 使用 create_worker_runner
) 创建 locust 工作线程,但这几乎没有意义。每个工作线程都需要在单独的Python进程中,直接与工作线程运行程序交互可能会破坏一些东西。只需使用常规 locust --worker ...命令启动工作程序即可。
还可以使用 Environment
实例 create_web_ui
的方法启动一个 Web UI,该 UI 可用于查看统计信息并控制运行器(例如启动和停止负载测试):
from locust import HttpUser, task
class HelloWorldUser(HttpUser):
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
if __name__ == "__main__":
from locust.env import Environment
env = Environment(user_classes=[HelloWorldUser], host="http://127.0.0.1:9000")
env.create_local_runner()
env.create_web_ui()
env.web_ui.greenlet.join()
import os
import sys
from locust import HttpUser, task
class HelloWorldUser(HttpUser):
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
if __name__ == "__main__":
script_file_path = sys.argv[0]
print(script_file_path)
os.system(f"locust -f {script_file_path}")
完整示例
#!/usr/bin/env python3
import gevent
from locust import HttpUser, task, events
from locust.env import Environment
from locust.stats import stats_printer, stats_history
from locust.log import setup_logging
setup_logging("INFO", None)
class MyUser(HttpUser):
host = "https://docs.locust.io"
@task
def t(self):
self.client.get("/")
# setup Environment and Runner
env = Environment(user_classes=[MyUser], events=events)
runner = env.create_local_runner()
# start a WebUI instance
web_ui = env.create_web_ui("127.0.0.1", 8089)
# execute init event handlers (only really needed if you have registered any)
env.events.init.fire(environment=env, runner=runner, web_ui=web_ui)
# start a greenlet that periodically outputs the current stats
gevent.spawn(stats_printer(env.stats))
# start a greenlet that save current stats to history
gevent.spawn(stats_history, env.runner)
# start the test
runner.start(1, spawn_rate=10)
# in 60 seconds stop the runner
gevent.spawn_later(60, lambda: runner.quit())
# wait for the greenlets
runner.greenlet.join()
# stop the web server for good measures
web_ui.stop()
示例:百万长连接性能测试
:https://zhuanlan.zhihu.com/p/97577744
# locust_test1.py
from locust import HttpLocust, TaskSet, task, between
class UserBehavior(TaskSet):
def on_start(self):
# on_start是在task中任何用户开始时都会调用的部分我们一般来进行初始化
self.login()
def on_stop(self):
# on_stop 在停止时调用,我们可以用来回收资源
self.logout()
def login(self):
self.client.post("/login", {"username":"ellen_key", "password":"education"})
def logout(self):
self.client.post("/logout", {"username":"ellen_key", "password":"education"})
# @task装饰器,更方便我们的使用,所有带@task都会进行调用
@task(2)
def index(self):
# 2/3的概率调用获得首页方法
self.client.get("/")
@task(1)
def profile(self):
# 1/3概率调用获得用户信息方法
self.client.get("/profile")
class WebsiteUser(HttpLocust):
host = "http://test.cn"
# 我们首先给task_set赋值
task_set = UserBehavior
# 设定下次调用等待时间,单位为秒
wait_time = between(5, 9)
接下来我们开始启动测试
- 启用 WEB 界面:locust -f locust_test1.py 执行后可以去 WEB 界面 http://127.0.0.1:8089 进行控制,
- 启用无WEB界面的方案 locust -f locust_test1.py --no-web -c 100 -r 20 -t 20m 模拟100用户,按20来进行递增,请求20分钟。
主从模式启动
locust -f locst_test1.py --master
locust -f locst_test1.py --slave --master-host=192.168.110.19
长连接脚本
简单的安装和QG我们都看过了,现在我们开始实战tcp长连接方式。因内部通信协议保密我们使用之前我开源的一个《超快地球物理坐标计算服务器》来进行演示。首先我们使用docker来启动服务器 docker run --rm -t -p 40000:40000 gcontainer/earth-server earth_server -c
我们首先创建一个Socket连接的基础类,主要负责socket连接的建立、收发消息、关闭
class SocketClient(object):
def __init__(self):
# 仅在新建实例的时候创建socket.
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def __getattr__(self, name):
conn = self._socket
def wrapper(*args, **kwargs):
# 根据后面做的业务类,不同的方法做不同的处理
if name == "connect":
try:
conn.connect(args[0])
except Exception as e:
print(e)
elif name == "send":
print(' '.join(hex(ord(i)) for i in args[0]))
conn.sendall(args[0])
data = conn.recv(1024)
print(data)
elif name == "close":
conn.close()
return wrapper
接下来我们创建一个实际的业务处理类UserBehavior集成自TaskSet
class UserBehavior(TaskSet):
def on_start(self):
# 该方法每用户启动时调用进行连接打开
self.client.connect((self.locust.host, self.locust.port))
def on_stop(self):
# 该方法当程序结束时每用户进行调用,关闭连接
self.client.close()
@task(1)
def sendAddCmd(self):
# 处理坐标的增加1%的概率调用 该方法
lat, log = generate_random_gps()
dataBody = [
'add ',
ranstr(6),
' ',
format(log,'f'),
' ',
format(lat,'f'),
'\x0d','\x0a']
start_time = time.time()
# 接下来做实际的网络调用,并通过request_failure和request_success方法分别统计成功和失败的次数以及所消耗的时间
try:
self.client.send("".join(dataBody))
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="earthtest", name="add", response_time=total_time, response_length=0, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="earthtest", name="add", response_time=total_time, response_length=0)
@task(99)
def sendGetCmd(self):
lat, log = generate_random_gps()
dataBody = [
'get ',
format(log,'f'),
' ',
format(lat,'f'),
' 5',
'\x0d','\x0a']
start_time = time.time()
try:
self.client.send("".join(dataBody))
except Exception as e:
total_time = int((time.time() - start_time) * 1000)
events.request_failure.fire(request_type="earthtest", name="get", response_time=total_time, response_length=0, exception=e)
else:
total_time = int((time.time() - start_time) * 1000)
events.request_success.fire(request_type="earthtest", name="get", response_time=total_time, response_length=0)
最终实现我们的启动类,一个完整的调用过程结束
class SocketUser(SocketLocust):
# 目标地址
host = "127.0.0.1"
# 目标端口
port = 40000
task_set = UserBehavior
wait_time = between(0.1, 1)
我们模拟200用户启动下试试脚本。locust -f locust_tcptest.py --no-web -c 200 -r 50 -t 10m
参考资料
示例:变成性能测试老司机
:https://zhuanlan.zhihu.com/p/143892229
编写 locustfile
locustfile 文件
locust 文件只是一个普通的 Python 模块
一个更完整/更现实的测试示例:
import time
from locust import HttpUser, task, between
class QuickstartUser(HttpUser):
wait_time = between(1, 5) # 使模拟用户在执行每个任务(见下文)后等待 1 到 5 秒
@task
def hello_world(self):
self.client.get("/hello")
self.client.get("/world")
@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f"/item?id={item_id}", name="/item")
time.sleep(1)
def on_start(self):
self.client.post("/login", json={"username":"foo", "password":"bar"})
- 继承
HttpUser
,为每个用户提供一个client
属性,该属性是HttpSession
的实例,可用于向我们要加载测试的目标系统发出 HTTP 请求。当测试开始时,Locust 将为其模拟的每个用户创建一个此类的实例,并且每个用户都将开始在他们自己的绿色 gevent 线程中运行。要使文件成为有效的 locustfile,它必须包含至少一个继承自User
的类。 @task
方法是 locust 文件的核心。对于每个正在运行的用户,Locust 都会创建一个 greenlet(微线程),它将调用这些方法。其中一个方法被赋予了更高的权重3,没有权重时任务是随机选择的,分配不同的权重,代表执行的次数更多,这里权重是3,说明调用view_items的次数可能是 hello_world 的 三倍
HttpUser 不是真正的浏览器,因此不会解析 HTML 响应来加载资源或呈现页面。不过,它会跟踪 cookie。
@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f"/item?id={item_id}", name="/item")
time.sleep(1)
在 view_items
任务中,通过查询参数载入了10个不同的url,为了不让在locust的统计状态中显示,可以使用 "/item"
参数进行分组显示
自动生成 locustfile 文件
对于不习惯编写 locustfile 的初学者特别有用。har2locust 仍处于测试阶段。它可能并不总是生成正确的 locustfile,并且其界面可能会在版本之间更改。
User 类
用户类表示系统的一种用户/方案类型。当进行测试运行时,您可以指定要模拟的并发用户数,Locust 将为每个用户创建一个实例。你可以将任何你喜欢的属性添加到这些类/实例中,但有一些属性对 Locust 有特殊意义:
wait_time
的方法可以很容易地在每次任务执行后引入延迟。如果未指定wait_time,则下一个任务将在完成后立即执行。constant
在固定的时间内between
最小值和最大值之间的随机时间
示例:使每个用户在每次任务执行之间等待 0.5 到 10 秒:
from locust import User, task, between
class MyUser(User):
@task
def my_task(self):
print("executing my_task")
wait_time = between(0.5, 10)
constant_throughput
用于确保任务每秒运行(最多)X 次的自适应时间。constant_pacing
用于确保任务(最多)每 X 秒运行一次的自适应时间(它是 constant_throughput 的数学倒数)。
例如,如果希望 Locust 在峰值负载下每秒运行 500 次任务迭代,则可以使用 wait_time = constant_throughput(0.1) 和 5000 的用户计数。
等待时间只会限制吞吐量,而不能启动新用户来达到目标。因此,在我们的示例中,如果任务迭代时间超过 10 秒,则吞吐量将小于 500。
等待时间是在任务执行后应用的,因此,如果您的生成率很高,您最终可能会在爬坡期间超过您的目标。
weight 和 fixed_count 属性
如果希望模拟更多特定类型的用户,则可以在这些类上设置权重属性。例如,网络用户的可能性是移动用户的三倍:@task采用可选的权重参数,该参数可用于指定任务的执行比率
class WebUser(User):
weight = 3
...
class MobileUser(User):
weight = 1
...
也可以设置属性 fixed_count
。在这种情况下,权重属性将被忽略,并且将生成确切的计数用户。首先生成这些用户。在下面的示例中,将只生成一个 AdminUser 实例,以便进行一些特定的工作,更准确地控制请求计数,而不受用户总数的影响。
class AdminUser(User):
wait_time = constant(600)
fixed_count = 1
@task
def restart_app(self):
...
class WebUser(User):
...
host 属性
host 属性是要测试的主机的 URL 前缀(例如 https://google.com
)。它会自动添加到请求中,因此您可以这样做 self.client.get("/")
。可以在 Locust 的 Web UI 中或使用该 --host
选项在命令行上覆盖此值。
@tasks 和 task属性
启动负载测试时,将为每个模拟用户创建一个 User 类的实例,并且这些用户将开始在自己的绿色线程中运行。当这些用户运行时,他们会选择他们执行的任务,休眠一段时间,然后选择一个新任务,依此类推。User 类用 @task
装饰器将任务声明为其下的方法,也可以使用 tasks 属性指定任务
@task
装饰器:为用户添加任务的最简单方法是使用 @task
装饰器。
from locust import User, task, constant
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
print("User instance (%r) executing my_task" % self)
tasks 属性:定义用户任务的另一种方法是设置 tasks
属性。
tasks 属性可以是 Tasks 列表,也可以是 字典,其中 Task 是 python 可调用对象或 TaskSet 类。如果任务是普通的 python 函数,则它们会收到一个参数,即执行任务的 User 实例。
from locust import User, constant
def my_task(user):
pass
class MyUser(User):
tasks = [my_task]
wait_time = constant(1)
如果将 tasks 属性指定为列表,则每次执行任务时,都会从 tasks 属性中随机选择该任务。但是,如果 tasks 是一个字典 - 将可调用对象作为键,int 作为值 - 将随机选择要执行的任务,但以 int 作为比率。因此,对于如下所示的任务:{my_task: 3, another_task: 1} 表示 my_task被执行可能性是another_task的 3 倍。在内部,上面的字典实际上将扩展为一个列表(并且属性 tasks
已更新),如下所示:[my_task, my_task, my_task, another_task],然后使用 Python random.choice()
从列表中选择任务。
@tag装饰器
使用 @tag
装饰器标记任务,然后通过 --tags
和 --exclude-tags
参数来选择在测试期间执行的任务。示例:
from locust import User, constant, task, tag
class MyUser(User):
wait_time = constant(1)
@tag('tag1')
@task
def task1(self):
pass
@tag('tag1', 'tag2')
@task
def task2(self):
pass
@tag('tag3')
@task
def task3(self):
pass
@task
def task4(self):
pass
使用 --tags tag1
启动此测试,则在测试期间将仅执行 task1 和 task2。如果以 --tags tag2 tag3
启动它,则只会执行 task2 和 task3。
event 事件
如果你想在测试中运行一些设置代码,通常把它放在 locustfile 的模块级别就足够了,但有时你需要在运行中的特定时间做一些事情。为了满足这一需求,Locust 提供了事件钩子。
from locust import events
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
print("A new test is starting")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
print("A new test is ending")
init 初始化
该 init
事件在每个 locust 进程开始时触发。这在分布式模式下特别有用,在分布式模式下,每个工作进程(而不是每个用户)都需要机会进行一些初始化。例如,假设您有一些全局状态,从此过程中生成的所有用户都需要该状态:
from locust import events
from locust.runners import MasterRunner
@events.init.add_listener
def on_locust_init(environment, **kwargs):
if isinstance(environment.runner, MasterRunner):
print("I'm on master node")
else:
print("I'm on a worker or standalone node")
其他事件
参阅 extending locust using event hooks 以获取其他事件,以及如何使用它们的更多示例。
on_start 和 on_stop 方法
HttpUser 类
HttpUser
是最常用 User
的。它添加了一个 client
用于发出 HTTP 请求的属性。
from locust import HttpUser, task, between
class MyUser(HttpUser):
wait_time = between(5, 15)
@task(4)
def index(self):
self.client.get("/")
@task(1)
def about(self):
self.client.get("/about/")
client 属性 / HttpSession
client
是 HttpSession
的实例。HttpSession 是 requests.Session
的子类/包装器。就像 requests.Session
一样,它会在请求之间保留 cookie,因此可以轻松用于登录网站。
response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")
验证 response
如果 HTTP 响应代码正常 (<400),则认为请求成功,但对响应进行一些额外的验证通常很有用。可以使用 catch_response 参数、with 语句和对 response.failure() 的调用将请求标记为失败
with self.client.get("/", catch_response=True) as response:
if response.text != "Success":
response.failure("Got wrong response")
elif response.elapsed.total_seconds() > 0.5:
response.failure("Request took too long")
您还可以将请求标记为成功,即使响应代码错误:
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
response.success()
您甚至可以通过抛出异常,然后在 with-block 之外捕获它来完全避免记录请求。或者你可以抛出一个 locust 异常让 locust 捕捉到它。
from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
raise RescheduleTask()
对请求进行分组
网站的 URL 包含某种动态参数的页面很常见。通常,在用户的统计信息中将这些 URL 组合在一起是有意义的。这可以通过将 name 参数传递给 HttpSession's
不同的请求方法来完成。
for i in range(10):
self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")
在某些情况下,可能无法将参数传递到请求函数中,例如在与包装请求会话的库/SDK 交互时。通过设置 client.request_name
属性,提供了对请求进行分组的另一种方法。
self.client.request_name="/blog?id=[id]"
for i in range(10):
self.client.get("/blog?id=%i" % i)
self.client.request_name=None
如果要使用最少的样板链接多个分组,则可以使用 client.rename_request()
上下文管理器。
@task
def multiple_groupings_example(self):
# Statistics for these requests will be grouped under: /blog/?id=[id]
with self.client.rename_request("/blog?id=[id]"):
for i in range(10):
self.client.get("/blog?id=%i" % i)
# Statistics for these requests will be grouped under: /article/?id=[id]
with self.client.rename_request("/article?id=[id]"):
for i in range(10):
self.client.get("/article?id=%i" % i)
使用 catch_response 并直接访问request_meta,您甚至可以根据响应中的某些内容重命名请求。
with self.client.get("/", catch_response=True) as resp:
resp.request_meta["name"] = resp.json()["name"]
HTTP 代理设置
连接池
由于每个 HttpUser
都会创建新的 HttpSession
,所以每个用户实例都有自己的连接池。这类似于真实用户与 Web 服务器的交互方式。
但是,如果要在所有用户之间共享连接,则可以使用单个池管理器。为此,请将 class 属性设置为 pool_manager
的 urllib3.PoolManager
实例。
from locust import HttpUser
from urllib3 import PoolManager
class MyUser(HttpUser):
# All users will be limited to 10 concurrent connections at most.
pool_manager = PoolManager(maxsize=10, block=True)
TaskSets 任务集
TaskSets 是一种对分层网站/系统进行结构化测试的方法。You can read more about it here.
Examples 示例
这里有很多 locustfile 示例: here
分布式 负载
如果测试计划很复杂,或者想要运行更多负载,则需要横向扩展到多个进程,甚至可能是多台计算机。由于 Python 无法充分利用每个进程的多个内核(参见 GIL),因此您可能需要为每个处理器内核运行一个工作器实例才能访问所有计算能力。
为此,您可以使用该 --master
标志在主模式下启动一个 Locust 实例,并使用该 --worker
标志启动多个工作实例。如果工作线程与主服务器不在同一台计算机上,则用于 --master-host
将他们指向运行主服务器的计算机的 IP/主机名。
为了简化此操作,您可以使用该 --processes
标志启动多个实例。默认情况下,它将启动一个主进程和指定数量的工作进程。与 --worker
它结合使用只会启动 worker。
子进程使用 fork 启动,这在 Windows 中不可用。
主实例运行 Locust 的 Web 界面,并告诉 worker 何时生成/停止用户。工作线程实例运行您的用户并将统计信息发送回主服务器。主实例本身不运行任何用户。
每个工作线程可以运行的用户数几乎没有限制。Locust/gevent 每个进程可以运行数千甚至数万个用户,只要它们的总请求速率 (RPS) 不太高。
如果 Locust 即将耗尽 CPU 资源,它将记录警告。如果没有警告,您可以非常确定您的测试不受负载生成器 CPU 的限制。
示例 1:单台机器
启动一个主进程和 4 个工作进程非常简单:locust --processes 4
也可以自动检测机器中的内核数量,并为每个内核启动一个工作线程:locust --processes -1
示例 2:多台机器
在一台机器上以主模式启动蝗虫:locust -f my_locustfile.py --master
然后在每台工作机器上:locust -f my_locustfile.py --worker --master-host <your master's address> --processes 4
请注意,主节点和工作节点都需要访问 locustfile,它不会自动从 master 发送到 worker。但是你可以使用 locust-swarm 来自动化它。
跨节点通信
在分布式模式下运行 Locust 时,您可能希望在主节点和工作节点之间进行通信以协调数据。这可以通过使用内置消息挂钩的自定义消息轻松完成:
from locust import events
from locust.runners import MasterRunner, WorkerRunner
# Fired when the worker receives a message of type 'test_users'
def setup_test_users(environment, msg, **kwargs):
for user in msg.data:
print(f"User {user['name']} received")
environment.runner.send_message('acknowledge_users', f"Thanks for the {len(msg.data)} users!")
# Fired when the master receives a message of type 'acknowledge_users'
def on_acknowledge(msg, **kwargs):
print(msg.data)
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message('test_users', setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)
@events.test_start.add_listener
def on_test_start(environment, **_kwargs):
if not isinstance(environment.runner, WorkerRunner):
users = [
{"name": "User1"},
{"name": "User2"},
{"name": "User3"},
]
environment.runner.send_message('test_users', users)
FastHttpUser
使用更快的 HTTP 客户端提高性能: Increase performance with a faster HTTP client.
Locust 的默认 HTTP 客户端使用 python-requests。
from locust import task, FastHttpUser
class MyUser(FastHttpUser):
@task
def index(self):
response = self.client.get("/")
Locust 还附带使用 geventhttpclient 实现的 FastHttpUser
以非常高的吞吐量运行测试。它提供了一个非常相似的 API,并且使用的 CPU 时间要少得多,有时在给定硬件上每秒的最大请求数会增加 5 倍到 6 倍。假设单个 Locust 进程(仅限于一个 CPU 内核)使用 FastHttpUser 每秒可以执行大约 16000 个请求,使用 HttpUser 每秒可以执行 4000 个请求
只要负载生成器 CPU 没有过载,FastHttpUser 的响应时间应该与 HttpUser 的响应时间几乎相同。它不会更快地提出单个请求。
单个 FastHttpUser/geventhttpclient 会话可以并发执行请求,只需为每个请求启动 greenlets:
@task
def t(self):
def concurrent_request(url):
self.client.get(url)
pool = gevent.pool.Pool()
urls = ["/url1", "/url2", "/url3"]
for url in urls:
pool.spawn(concurrent_request, url)
pool.join()
在调试器中运行测试
在调试器中运行 Locust 在开发测试时非常有用。除此之外,您可以检查特定的响应或检查某些用户实例变量。
但是调试器有时会遇到像 Locust 这样的复杂 gevent 应用程序的问题,而且框架本身发生了很多事情,您可能不感兴趣。为了简化这一点,Locust 提供了一种称为 run_single_user
:
from locust import HttpUser, task, run_single_user
class QuickstartUser(HttpUser):
host = "http://localhost"
@task
def hello_world(self):
with self.client.get("/hello", catch_response=True) as resp:
pass # maybe set a breakpoint here to analyze the resp object?
# if launched directly, e.g. "python3 debugging.py", not "locust -f debugging.py"
if __name__ == "__main__":
run_single_user(QuickstartUser)
它隐式地为请求事件注册一个事件处理程序,以打印有关每个请求的一些统计信息:
可以通过将参数指定为 run_single_user
来准确配置打印的内容。
打印 HTTP 通信
对于 HttpUser
( python-requests):
# put this at the top of your locustfile (or just before the request you want to trace)
import logging
from http.client import HTTPConnection
HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
对于 FastHttpUser
( geventhttpclient):
import sys
...
class MyUser(FastHttpUser):
@task
def t(self):
self.client.get("http://example.com/", debug_stream=sys.stderr)
无头 模式
可以在没有 Web UI 的情况下运行 locust 通过将 --headless
与 -u/--users
和 -r/--spawn-rate
一起使用:locust -f locust_files/my_locust_file.py --headless -u 100 -r 5
即使在无头模式下,您也可以在测试运行时更改用户计数。按下 w
可添加 1 个用户或 W
添加 10 个用户。按下 s
可移除 1 或 S
移除 10。
若要指定测试的运行时间,请使用
-t/--run-time
:locust --headless -u 100 --run-time 1h30m
$ locust --headless -u 100 --run-time 60 # default unit is seconds
默认情况下,Locust 会立即停止您的任务(甚至无需等待请求完成)。要给正在运行的任务一些时间来完成迭代,请使用 -s/--stop-timeout
:locust --headless --run-time 1h30m --stop-timeout 10s
如果要在没有 Web UI 的情况下运行 Locust 分布式,则应在启动主节点时指定 --expect-workers
选项,以指定预期连接的工作节点数。然后,它将等到许多工作节点连接后再开始测试。
Event hooks
Locust 带有许多事件钩子,可用于以不同的方式扩展 Locust。
例如,下面介绍如何设置在请求完成后触发的事件侦听器:
from locust import events
@events.request.add_listener
def my_request_handler(request_type, name, response_time, response_length, response,
context, exception, start_time, url, **kwargs):
if exception:
print(f"Request to {name} failed with exception {exception}")
else:
print(f"Successfully made a request to: {name}")
print(f"The response was {response.text}")
在分布式模式下运行 locust 时,在运行测试之前在工作节点上进行一些设置可能很有用。您可以通过检查节点的类型来检查以确保您没有在主节点上运行 runner
:
from locust import events
from locust.runners import MasterRunner
@events.test_start.add_listener
def on_test_start(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Beginning test setup")
else:
print("Started test from Master node")
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Cleaning up test data")
else:
print("Stopped test from Master node")
若要查看可用事件的完整列表,请参阅事件挂钩。Event hooks.
请求上下文
有一个 request event
context 参数,使您能够传递有关请求的数据(例如用户名、标签等)。它可以直接在对请求方法的调用中设置,也可以在用户级别通过重写 User.context() 方法进行设置。
class MyUser(HttpUser):
@task
def t(self):
self.client.post("/login", json={"username": "foo"})
self.client.get("/other_request", context={"username": "foo"})
@events.request.add_listener
def on_request(context, **kwargs):
if context:
print(context["username"])
来自用户实例的上下文:
class MyUser(HttpUser):
def context(self):
return {"username": self.username}
@task
def t(self):
self.username = "foo"
self.client.post("/login", json={"username": self.username})
@events.request.add_listener
def on_request(context, **kwargs):
print(context["username"])
响应中值的上下文,使用catch_response:
with self.client.get("/", catch_response=True) as resp:
resp.request_meta["context"]["requestId"] = resp.json()["requestId"]
添加 Web 路由
Locust 使用 Flask 来提供 Web UI,因此很容易将 Web 端点添加到 Web UI。通过侦听事件 init
,我们可以检索对 Flask 应用实例的引用,并使用它来设置新路由:
from locust import events
@events.init.add_listener
def on_locust_init(environment, **kw):
@environment.web_ui.app.route("/added_page")
def my_added_page():
return "Another page"
您现在应该能够启动 Locust 并浏览到 http://127.0.0.1:8089/added_page
扩展 Web UI
作为添加简单 Web 路由的替代方法,您可以使用 Flask 蓝图和模板不仅可以添加路由,还可以扩展 Web UI,以便与内置的 Locust 统计信息一起显示自定义数据。这更高级,因为它还涉及编写和包含路由提供的 HTML 和 Javascript 文件,但可以大大增强 Web UI 的实用性和可定制性。
扩展 Web UI 的工作示例,包括 HTML 和 Javascript 示例文件,可以在 Locust 源代码的 examples 目录中找到。
运行后台 greenlet
因为蝗虫文件“只是代码”,所以没有什么能阻止你生成自己的 greenlet 与你的实际负载/用户并行运行。
import gevent
from locust import events
from locust.runners import STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP, MasterRunner, LocalRunner
def checker(environment):
while not environment.runner.state in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]:
time.sleep(1)
if environment.runner.stats.total.fail_ratio > 0.2:
print(f"fail ratio was {environment.runner.stats.total.fail_ratio}, quitting")
environment.runner.quit()
return
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
# dont run this on workers, we only care about the aggregated numbers
if isinstance(environment.runner, MasterRunner) or isinstance(environment.runner, LocalRunner):
gevent.spawn(checker, environment)
Logging 日志
Locust 使用 Python 内置的日志记录框架来处理日志记录。
使用 locust 进行爬虫
既然可以发起请求测试,那么肯定可以用来爬虫爬数据。
:https://docs.locust.io/en/latest/writing-a-locustfile.html#user-class
client 属性 / HttpSession
client
是 HttpSession
的实例。HttpSession 是 requests.Session
的子类/包装器。就像 requests.Session
一样,它会在请求之间保留 cookie,因此可以轻松用于登录网站。
response = self.client.post("/login", {"username":"testuser", "password":"secret"})
print("Response status code:", response.status_code)
print("Response text:", response.text)
response = self.client.get("/my-profile")
验证 response
如果 HTTP 响应代码正常 (<400),则认为请求成功,但对响应进行一些额外的验证通常很有用。可以使用 catch_response 参数、with 语句和对 response.failure() 的调用将请求标记为失败
with self.client.get("/", catch_response=True) as response:
if response.text != "Success":
response.failure("Got wrong response")
elif response.elapsed.total_seconds() > 0.5:
response.failure("Request took too long")
您还可以将请求标记为成功,即使响应代码错误:
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
response.success()
您甚至可以通过抛出异常,然后在 with-block 之外捕获它来完全避免记录请求。或者你可以抛出一个 locust 异常让 locust 捕捉到它。
from locust.exception import RescheduleTask
...
with self.client.get("/does_not_exist/", catch_response=True) as response:
if response.status_code == 404:
raise RescheduleTask()
对请求进行分组
网站的 URL 包含某种动态参数的页面很常见。通常,在用户的统计信息中将这些 URL 组合在一起是有意义的。这可以通过将 name 参数传递给 HttpSession's
不同的请求方法来完成。
for i in range(10):
self.client.get("/blog?id=%i" % i, name="/blog?id=[id]")
在某些情况下,可能无法将参数传递到请求函数中,例如在与包装请求会话的库/SDK 交互时。通过设置 client.request_name
属性,提供了对请求进行分组的另一种方法。
self.client.request_name="/blog?id=[id]"
for i in range(10):
self.client.get("/blog?id=%i" % i)
self.client.request_name=None
如果要使用最少的样板链接多个分组,则可以使用 client.rename_request()
上下文管理器。
@task
def multiple_groupings_example(self):
# Statistics for these requests will be grouped under: /blog/?id=[id]
with self.client.rename_request("/blog?id=[id]"):
for i in range(10):
self.client.get("/blog?id=%i" % i)
# Statistics for these requests will be grouped under: /article/?id=[id]
with self.client.rename_request("/article?id=[id]"):
for i in range(10):
self.client.get("/article?id=%i" % i)
使用 catch_response 并直接访问request_meta,您甚至可以根据响应中的某些内容重命名请求。
with self.client.get("/", catch_response=True) as resp:
resp.request_meta["name"] = resp.json()["name"]
HTTP 代理设置
连接池
由于每个 HttpUser
都会创建新的 HttpSession
,所以每个用户实例都有自己的连接池。这类似于真实用户与 Web 服务器的交互方式。
但是,如果要在所有用户之间共享连接,则可以使用单个池管理器。为此,请将 class 属性设置为 pool_manager
的 urllib3.PoolManager
实例。
from locust import HttpUser
from urllib3 import PoolManager
class MyUser(HttpUser):
# All users will be limited to 10 concurrent connections at most.
pool_manager = PoolManager(maxsize=10, block=True)
限制 并发数
注意:locust 如果未指定 wait_time 则一个任务完成后立即执行下一个任务。所以一定要限制并发数,防止对网站造成 DDOS 共计。可以在每个任务之间设置等待时间,来限制并发数。
User 类的
wait_time
的方法可以很容易地在每次任务执行后引入延迟。class MyUser(User):
# wait between 3.0 and 10.5 seconds after each task
wait_time = between(3.0, 10.5)
constant
在固定的时间内
from locust import User, task, constant
class MyUser(User):
wait_time = constant(1)
@task
def my_task(self):
print("User instance (%r) executing my_task" % self)
between
最小值和最大值之间的随机时间
from locust import User, task, between
class MyUser(User):
@task
def my_task(self):
print("executing my_task")
wait_time = between(0.5, 10)
例如,如果希望 Locust 在峰值负载下每秒运行 500 次任务迭代,则可以使用 wait_time = constant_throughput(0.1) 和 5000 的用户计数。
等待时间只会限制吞吐量,而不能启动新用户来达到目标。因此,在我们的示例中,如果任务迭代时间超过 10 秒,则吞吐量将小于 500。
分布式 爬虫
可以使用redis 作为任务队列,每个 work 从redis 获取任务
示例 代码
test.py
from gevent import monkey; monkey.patch_all()
import gevent
import redis
from locust import task, FastHttpUser, constant
from locust.exception import RescheduleTask
from bs4 import BeautifulSoup
rk = "task_queue"
redis_conn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
class MyUser(FastHttpUser):
wait_time = constant(0.5) // 用来限制并发, 每0.5秒发送一个请求
def get_redis_task(self):
task_byte = redis_conn.spop(rk)
if not task_byte:
return None
task_string = task_byte.decode('utf8')
return task_string
@task
def crawl(self):
task_string = self.get_redis_task()
if not task_string:
raise RescheduleTask()
return None
req_url = task_string
with self.client.get(req_url, catch_response=True) as response:
if response.status_code != 200:
response.failure("请求失败")
else:
response.encoding = "utf-8"
resp_text = response.text
soup = BeautifulSoup(resp_text, "html.parser", from_encoding='utf-8')
img_list = soup.find_all('img')
list(map(lambda x=None: print(x["src"]), img_list))
def add_redis_task():
# https://www.2meinv.com
for page_num in range(1, 100):
url = f"https://www.2meinv.com/index-{page_num}.html"
redis_conn.sadd(rk, url)
pass
if __name__ == '__main__':
gevent.spawn(add_redis_task).join()
首先添加任务:python test.py
然后 locust 执行:locust -f test.py
浏览器打开 url