Python3+unittest+request 实现接口自动化测试 (完整简单案例) 您所在的位置:网站首页 为什么要对接口进行测试 Python3+unittest+request 实现接口自动化测试 (完整简单案例)

Python3+unittest+request 实现接口自动化测试 (完整简单案例)

2024-07-14 21:02| 来源: 网络整理| 查看: 265

Python3+unittest+request 实现接口自动化测试 (完整简单案例) 前言一、准备工作1. 建好项目及相关文件夹2. 准备好相关文档 二、代码部分1. readConfig.py读取config.ini配置文件2. 创建公用类3. 测试用例3. 执行所有用例 结果展示总结

前言

小白个人研究成果,仅供参考

一、准备工作 1. 建好项目及相关文件夹 common - 通用类,用于存放所有可以共用的类,例如写入日志、连接数据库,连接接口等result - 用于存放所有结果,例如生成的日志文件以及生成的html测试报告testCase - 存放所有测试用例testFile - 存放相关文档没有存放在文件夹里面的还有 caselist.txt - 存放所有要执行的用例的路径 config.ini - 配置文件 readConfig.py - 读取配置文件config.ini runAll.py - 测试入口 在这里插入图片描述 2. 准备好相关文档

testFile 文件夹下:

case_demo.xls 用于存放所有要执行测试用例路径(这个可以后面写,但先讲) 内容如下: 包含 case_name:用例名称、method:请求方式,url:接口路径,params:参数,code:响应码, meaasge:响应内容 在这里插入图片描述sql.xml 存放所有要执行的sql SELECT * FROM t_organization_info WHERE city = %s SELECT * FROM t_organization_info WHERE org_code_type = %s and city = %s SELECT * FROM t_case

最外层文件: caselist.txt 存放要执行的测试用例路径 在这里插入图片描述 config.ini 存放配置文件,例如数据库配置、邮箱配置、http请求配置等 email 参数讲解: code:是邮件授权码(不是密码哦,具体参考https://baijiahao.baidu.com/s?id=1552315463915496&wfr=spider&for=pc) smpt:是邮箱服务器地址,我用的是qq邮箱 sender:发送者 recerives :接收者,可以写多个 subject :邮件标题

[DATABASE] host = 127.0.0.1 port = 3306 name = root pwd = 123456 database = db_demo_org [DATABASE_CASE] host = 127.0.0.1 port = 3306 name = root pwd = 123456 database = demo [HTTP] url = http://localhost:8086/quota/ timeout = 3 [EMAIL] smtp = smtp.qq.com sender = [email protected] recerives = [email protected] code = xxxxxxxxxqxoifaa subject = 接口自动化测试报告 content = 您好,附件是接口自动化测试报告,请查收。 on_off = 1 二、代码部分 1. readConfig.py读取config.ini配置文件

创建readConfig.py

#!/usr/bin/env python # -*- coding:utf-8 -*- import os import codecs import configparser # 获取当前py文件路径的上一层 localDir = os.path.split(os.path.realpath(__file__))[0] # 拼接成config.ini完整路径 configPath = os.path.join(localDir, 'config.ini') class ReadConfig: """读取config.ini文件""" def __init__(self): # 打开文件 config_file = open(configPath, encoding='UTF-8') data = config_file.read() # 判断是否带BOM文件 if data[:3] == codecs.BOM_UTF8: data = data[3:] file = codecs.open(configPath, 'w', encoding='UTF-8') # 改写文件 file.write(data) file.close() config_file.close() # 创建configparser实例 self.cf = configparser.ConfigParser() # 读取配置文件 self.cf.read(configPath, encoding='UTF-8') # 获取配置文件email的参数 def get_email(self, name): return self.cf.get('EMAIL', name) # 获取配置文件http的参数 def get_http(self, name): return self.cf.get('HTTP', name) # 获取配置文件database的参数 def get_database(self, name): return self.cf.get('DATABASE', name) # 获取配置文件database_case的参数 def get_database_case(self, name): return self.cf.get('DATABASE_CASE', name) 2. 创建公用类

common文件夹下(以下几个py文件我就不分先后顺序介绍了):

configDB.py 操作数据库 #!/usr/bin/env python # -*- coding:utf-8 -*- import MySQLdb # python3安装命令:pip3 install Mysqlclien from readConfig import ReadConfig from common.myLog import MyLog myConfig = ReadConfig() class MyDB: """数据库""" global host, name, pwd, port, database, config # 获取配置文件中的参数 host = myConfig.get_database("host") name = myConfig.get_database("name") pwd = myConfig.get_database("pwd") port = myConfig.get_database("port") database = myConfig.get_database("database") config = { 'host': str(host), 'user': name, 'password': pwd, 'port': int(port), 'db': database, 'charset': 'utf8' } def __init__(self): self.logger = MyLog.get_log().logger self.db = None self.cursor = None def connect_db(self): """连接数据库""" try: self.db = MySQLdb.connect(**config) self.cursor = self.db.cursor() print('数据库已连接成功') except ConnectionError as e: self.logger.error(str(e)) def execute_sql(self, sql, params): """执行sql并返回操作游标""" try: self.connect_db() self.cursor.execute(sql, params) self.db.commit() return self.cursor except RuntimeError as e: self.db.rollback() def get_all(self, cursor): """获取全部数据""" value = cursor.fetchall() return value def get_one(self, cursor): """获取第一条数据""" value = cursor.fetchone() return value def close_db(self): """关闭数据库""" self.db.close() print('数据库已关闭') configDBCase.py 还是操作数据库,但我这里是另一个数据库专门处理测试用例的(本文读取测试用例一共用了两种方式 1-读取数据库,2-读取xls文档) #!/usr/bin/env python # -*- coding:utf-8 -*- import MySQLdb # python3安装命令:pip3 install Mysqlclien from readConfig import ReadConfig from common.myLog import MyLog myConfig = ReadConfig() class MyDBCase: """测试用例数据库""" global host, name, pwd, port, database, config # 获取配置文件中的参数 host = myConfig.get_database_case("host") name = myConfig.get_database_case("name") pwd = myConfig.get_database_case("pwd") port = myConfig.get_database_case("port") database = myConfig.get_database_case("database") config = { 'host': str(host), 'user': name, 'password': pwd, 'port': int(port), 'db': database, 'charset': 'utf8' } def __init__(self): """初始化数据""" self.logger = MyLog.get_log().logger self.db = None self.cursor = None def connect_db(self): """连接数据库""" try: self.db = MySQLdb.connect(**config) self.cursor = self.db.cursor() print('数据库已连接成功') except ConnectionError as e: self.logger.error(str(e)) def execute_sql(self, sql, params): """执行sql并返回操作游标""" try: self.cursor.execute(sql, params) self.db.commit() return self.cursor except RuntimeError as e: self.db.rollback() def get_all(self, cursor): """返回全部数据""" value = cursor.fetchall() return value def get_one(self, cursor): """返回第一条数据""" value = cursor.fetchone() return value def close_db(self): """关闭数据库连接""" self.db.close() print('数据库已关闭') configEmail.py 处理邮件 #!/usr/bin/env python # -*- coding:utf-8 -*- import time import yagmail from readConfig import ReadConfig from common.myLog import MyLog myConfig = ReadConfig() class Email: """邮件处理""" def __init__(self): global smtp, sender, recerives, code, title, content # 读取配置文件中的数据 smtp = myConfig.get_email('smtp') sender = myConfig.get_email('sender') recerives = myConfig.get_email('recerives') code = myConfig.get_email('code') title = myConfig.get_email('subject') content = myConfig.get_email('content') # 获取当前时间 date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) self.subject = title + date self.log = MyLog.get_log() self.logger = self.log.logger pass def send_email(self, accessory): """发送邮件""" # 参数:发送者,邮件授权码(不是密码哦,具体参考https://baijiahao.baidu.com/s?id=1552315463915496&wfr=spider&for=pc),邮箱服务器地址 yag = yagmail.SMTP(user=sender, password=code, host=smtp) # 发送邮件,参数:接收者,标题,内容,附件 yag.send(recerives, self.subject, content, accessory) # 关闭 yag.close() configHttp.py 请求接口 #!/usr/bin/env python # -*- coding:utf-8 -*- import requests import json # 引入写好的几个类 from readConfig import ReadConfig from common.myLog import MyLog # 实例化ReadConfig类 myConfig = ReadConfig() class ConfigHttp: """请求接口类""" # 全局变量token token = {} def __init__(self): """初始化""" global timeout timeout = myConfig.get_http('timeout') self.logger = MyLog.get_log().logger self.url = '' self.headers = {} self.params = {} self.data = {} self.files = {} pass def set_headers(self, headers): """设置参数""" self.headers = headers def set_params(self, params): self.params = params def set_data(self, data): self.data = data def set_files(self, files): self.files = files def set_url(self, url): self.url = url def get_requests(self): try: return requests.get(url=myConfig.get_http('url') + self.url, params=self.params, headers=self.headers, timeout=float(timeout)) except TimeoutError: self.logger.error('请求超时') return None def post_requests(self): """post请求方式""" try: return requests.post(url=myConfig.get_http('url') + self.url, json=self.data, files=self.files, headers=self.headers, timeout=float(timeout)) except TimeoutError: self.logger.error('请求超时') return None myLog.py 处理日志 #!/usr/bin/env python # -*- coding:utf-8 -*- import os import time import logging import threading import readConfig logPath = '' resultPath = '' localDir = '' class Log: """日志""" def __init__(self): """初始化""" global logPath, resultPath, localDir # 获取readConfig.py中的localDir参数 localDir = readConfig.localDir # 拼接result文件夹完整路径 resultPath = os.path.join(localDir, "result") # 判断文件是否存在,否则创建 if not os.path.exists(resultPath): os.mkdir(resultPath) # 创建日志文件 date = time.strftime('%Y%m%d', time.localtime()) logPath = os.path.join(resultPath, date) if not os.path.exists(logPath): os.mkdir(logPath) # 创建logger实例 self.logger = logging.getLogger() # 定义logger级别 self.logger.setLevel(logging.INFO) # 自定义Handle日志规则,如果是logging.StreamHandler则是输出到控制台,FileHandle则是输出到文件 handler = logging.FileHandler(os.path.join(logPath, 'output.log')) # 创建日志格式实例:%(asctime)s:打印日志的时间, %(levelname)s:打印日志级别的名称, %(message)s:打印日志信息 formater = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # handler自定义格式 handler.setFormatter(formater) # logger日志加载handle实例 self.logger.addHandler(handler) pass class MyLog: log = None # 创建线程锁 threadLock = threading.Lock() def __init__(self): pass # 申明静态方法,无需实例化 @staticmethod def get_log(): if MyLog.log is None: # 开启锁 MyLog.threadLock.acquire() # 调用Log类 MyLog.log = Log() # 释放锁 MyLog.threadLock.release() return MyLog.log pass readCase.py 读取xml、xls文档 #!/usr/bin/env python # -*- coding:utf-8 -*- import os from xlrd import open_workbook # 下载参考:https://blog.csdn.net/ZMJ_566/article/details/89360918 from common.configHttp import ConfigHttp from common.myLog import MyLog import readConfig from xml.etree import ElementTree localConfigHttp = ConfigHttp() logger = MyLog.get_log().logger database = {} class ReadCase: """读取文档""" def get_xls(self, xls_name, sheet_name): """读取xls文档。xls_name:文档名称,sheet_name:工作表sheet名称 """ cls = [] # 拼接地址 xlsPath = os.path.join(readConfig.localDir, 'testFile', xls_name) # 打开文档 file = open_workbook(xlsPath) # 根据工作表名获取工作表信息 sheet = file.sheet_by_name(sheet_name) # 获取工作表总行数 nrows = sheet.nrows # 遍历并忽略第一行表头 for i in range(1, nrows): cls.append(sheet.row_values(i)) return cls def get_xml(self): """读取xml文档""" if len(database) == 0: # 拼接地址 sql_path = os.path.join(readConfig.localDir, 'testFile', 'sql.xml') # 解析xml tree = ElementTree.parse(sql_path) # 遍历所有database标签 for db in tree.findall("database"): # 获取标签的name属性 db_name = db.get("name") table = {} for tb in db: table_name = tb.get("name") sql = {} for data in tb: sql_id = data.get("id") sql[sql_id] = data.text table[table_name] = sql database[db_name] = table def get_sql(self, database_name, table_name, sql_id): """根据数据库名、表名、sql_id获取sql""" try: self.get_xml() sql = database.get(database_name).get(table_name).get(sql_id) return sql except RuntimeError as e: logger.info('%s:获取的数据库名或表名或sql_id不存在', e) 3. 测试用例

在这里插入图片描述

testLogin.py 这里只用到了简单的请求参数 #!/usr/bin/env python # -*- coding:utf-8 -*- import unittest from common.configHttp import ConfigHttp from common.myLog import MyLog logger = MyLog.get_log().logger configHttp = ConfigHttp() class Login(unittest.TestCase): """登录""" def setUp(self): print('*****启动Login用例*****') # 这三个用例是我凑数用的 def test_one(self): """凑数用例1""" logger.info('test_one Method') def test_two(self): """凑数用例2""" logger.info('test_two Method') def test_three(self): """凑数用例3""" logger.info('test_three Method') def test_login(self): """登录接口""" # 请求参数 data = {'login': 'admin', 'password': '202cb962axxxxxxxxxxxxxxxxx0'} # 接口路径 url = 'eLogin' # 调用configHttp的相关方法 configHttp.set_url(url) configHttp.set_data(data) result = configHttp.post_requests() result_json = result.json() # 判断字段是否存在与json中 if 'code' in result_json: if result_json['code'] == 200 and result_json['message'] == '成功': logger.info('登录成功!') # 将token设置到configHttp中方便其他类调用 configHttp.token['token'] = result_json['data']['token'] pass def tearDown(self): print('*****销毁Login用例*****') testOrganizationInfo.py 这里用到了读取xml获取sql并执行,读取xls文档执行接口 #!/usr/bin/env python # -*- coding:utf-8 -*- import unittest import json from common.configDB import MyDB from common.readCase import ReadCase from common.myLog import MyLog from common.configHttp import ConfigHttp db = MyDB() readCase = ReadCase() logger = MyLog().get_log().logger class OrganizationInfo(unittest.TestCase): """组织机构""" def setUp(self): logger.info('*****启动OrganizationInfo用例*****') # 这里演示一下读取xml并操作数据库 def test_selectByCity(self): """读取xml中的sql并执行""" # 连接数据库 db.connect_db() # 读取xml获取sql sql = readCase.get_sql('db_demo_org', 't_organization_info', 'selectByCity') params = (u'广州市',) # 执行sql并返回操作游标 cursor = db.execute_sql(sql, params) # 获取全部数据 datas = db.get_all(cursor) for data in datas: print(data) # 关闭数据库 db.close_db() logger.info('test_selectByCity Method') def test_selectByTypeAndCity(self): """读取xml中的sql并执行""" # 连接数据库 db.connect_db() # 读取xml获取sql sql = readCase.get_sql('db_demo_org', 't_organization_info', 'selectByTypeAndCity') params = ('1', u'广州市') # 执行sql并返回游标 cursor = db.execute_sql(sql, params) # 获取全部数据 datas = db.get_all(cursor) for data in datas: print(data) # 关闭数据库 db.close_db() logger.info('test_selectByIdAndCity Method') def test_read_xls(self): """读取xls文档获取用例并执行""" # 读取xls文档获取用例 datas = readCase.get_xls('case_demo.xls', 'organizarion') configHttp = ConfigHttp() headers = {'Content-Type': 'application/json', 'Authorization': configHttp.token['token']} configHttp.set_headers(headers) for data in datas: # 设置参数 configHttp.set_url(data[2]) if data[1] == 'post': # 这里传过来是字符串,需要转成字典dict类型,否则接口报错(我的接口参数是map) configHttp.set_data(json.loads(data[3])) # 执行post请求 result = configHttp.post_requests() else: configHttp.set_params(data[3]) # 执行get请求 result = configHttp.get_requests() # 数据转json result_json = result.json() # 判断字段是否存在与json中 if 'code' in result_json: if result_json['code'] != 200: logger.error('接口异常') # 手动抛出用例异常 raise RuntimeError('接口异常') else: logger.error('接口异常') # 手动抛出用例异常 raise RuntimeError('接口异常') logger.info('测试organization接口') def tearDown(self): logger.info('*****销毁OrganizationInfo用例*****') if __name__ == '__main__': OrganizationInfo().test_selectByTypeAndCity() testUser.py 这里用到了读取数据库执行测试用例 #!/usr/bin/env python # -*- coding:utf-8 -*- import unittest import json from common.myLog import MyLog from common.configHttp import ConfigHttp from common.configDBCase import MyDBCase from common.readCase import ReadCase logger = MyLog.get_log().logger db = MyDBCase() configHttp = ConfigHttp() readCase = ReadCase() class User(unittest.TestCase): """用户""" # setUp(self) -> None 是 python3的写法,等于cls.func = none def setUp(self) -> None: logger.info('*****启动user用例*****') def test_read_db(self): """读取数据库中的测试用例并执行""" # 连接数据库 db.connect_db() # 获取xml文件的sql sql = readCase.get_sql('demo', 't_case', 'selectCaseAll') # 执行sql cursor = db.execute_sql(sql, '') # 返回全部数据 datas = db.get_all(cursor) # 设置header headers = {'Content-Type': 'application/json', 'Authorization': configHttp.token['token']} configHttp.set_headers(headers) for data in datas: # 设置url和参数 configHttp.set_url(data[3]) # map类型参数这里需要字符串转字典 configHttp.set_data(json.loads(data[4])) if data[2] == 'post': # 执行post请求 result = configHttp.post_requests() else: # 执行get请求 result = configHttp.get_requests() # 数据转json result_json = result.json() if 'code' in result_json: if result_json['code'] != data[5]: logger.error('接口异常') # 手动抛出异常 raise RuntimeError('接口异常') else: logger.error('接口异常') # 手动抛出异常 raise RuntimeError('接口异常') print(data) # 关闭数据库 db.close_db() logger.info('*****销毁test_read_db Method*****') pass def tearDown(self) -> None: logger.info('*****测试user用例*****') if __name__ == '__main__': unittest.main

数据库表:

CREATE TABLE `t_case` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL COMMENT '用例名称', `method` varchar(255) NOT NULL COMMENT '方法', `url` varchar(255) NOT NULL COMMENT '路径', `param` varchar(255) NOT NULL COMMENT '参数', `code` int(255) DEFAULT NULL COMMENT '编码', `message` varchar(255) DEFAULT NULL COMMENT '消息', PRIMARY KEY (`id`) ) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;

表内容: 在这里插入图片描述

3. 执行所有用例

runAll.py

#!/usr/bin/env python # -*- coding:utf-8 -*- import unittest import os import time from HTMLTestRunner import HTMLTestRunner # 这里参考https://blog.csdn.net/notHavaBug/article/details/115457523?spm=1001.2014.3001.5502 import readConfig from common.myLog import MyLog from common.configEmail import Email logger = MyLog.get_log().logger myConfig = readConfig.ReadConfig() class RunAll: def __init__(self): self.caseList = [] pass def set_case_list(self): # 读取caselist.txt文档获取用例路径 caselist_file = os.path.join(readConfig.localDir, 'caselist.txt') fb = open(caselist_file, 'r') for value in fb.readlines(): data = str(value) # 忽略被注释掉的用例路径 if data != '' and not data.startswith('#'): self.caseList.append(data.replace('\n', '')) def set_case_suit(self): self.set_case_list() # 创建测试套件 test_suite = unittest.TestSuite() suite_model = [] for case in self.caseList: # 根据'/'分割文件夹、文件名 case_url = case.split('/') # 拼接路径 case_file = os.path.join(os.getcwd(), 'testCase', case_url[0]) # 划重点!这个地方不要用unittest.defaultTestLoader.discover会因为初始化问题只能执行一次,因为这个我真的找了一整天的原因!!! test_loader = unittest.TestLoader() # 自动执行所有测试用例 # pattern支持模糊匹配,例如login下有很多测试用例且都是以test开头,不想一个个都写在caselist.txt中,可以在caselist.txt中只写文件夹名,这里直接写pattern='test*.py' discover = test_loader.discover(case_file, pattern=case_url[1] + '.py', top_level_dir=None) suite_model.append(discover) if len(suite_model) > 0: for suite in suite_model: for test_name in suite: test_suite.addTest(test_name) else: return None return test_suite def run(self): flag = 0 try: suit = self.set_case_suit() if suit is not None: logger.info('*****************开始测试***************') date = time.strftime('%Y%m%d-%H%M%S', time.localtime()) # 拼接html报告存放路径 resultPath = os.path.join(readConfig.localDir, 'result', date + '_test.html') fp = open(resultPath, 'wb') # 生成html测试报告 runner = HTMLTestRunner(stream=fp, title='测试报告', description=date + '测试用例执行情况') runner.run(suit) flag = 1 else: logger.info('没有测试用例') except Exception as e: print(e) logger.error(str(e)) finally: logger.info('*****************测试结束***************') on_off = myConfig.get_email('on_off') if int(on_off) == 1 and flag == 1: # 调用写好email类发送邮件 Email().send_email(resultPath) logger.info('已发送邮件') if __name__ == '__main__': RunAll().run() 结果展示 日志文件 在这里插入图片描述 在这里插入图片描述自动生成的html测试报告 报告中的文件描述都来自与我在每个类和用例下自己写的注释 在这里插入图片描述

在这里插入图片描述 3. 发送邮件

在这里插入图片描述

总结

如有不对之处欢迎指出 本文参考文献:https://www.cnblogs.com/wangxiaoqun/p/6924797.html



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有