Python3+unittest+request 实现接口自动化测试 (完整简单案例) | 您所在的位置:网站首页 › 为什么要对接口进行测试 › Python3+unittest+request 实现接口自动化测试 (完整简单案例) |
Python3+unittest+request 实现接口自动化测试 (完整简单案例)
前言一、准备工作1. 建好项目及相关文件夹2. 准备好相关文档
二、代码部分1. readConfig.py读取config.ini配置文件2. 创建公用类3. 测试用例3. 执行所有用例
结果展示总结
前言
小白个人研究成果,仅供参考 一、准备工作 1. 建好项目及相关文件夹 common - 通用类,用于存放所有可以共用的类,例如写入日志、连接数据库,连接接口等result - 用于存放所有结果,例如生成的日志文件以及生成的html测试报告testCase - 存放所有测试用例testFile - 存放相关文档没有存放在文件夹里面的还有 caselist.txt - 存放所有要执行的用例的路径 config.ini - 配置文件 readConfig.py - 读取配置文件config.ini runAll.py - 测试入口![]() testFile 文件夹下: case_demo.xls 用于存放所有要执行测试用例路径(这个可以后面写,但先讲) 内容如下: 包含 case_name:用例名称、method:请求方式,url:接口路径,params:参数,code:响应码, meaasge:响应内容![]() 最外层文件: caselist.txt 存放要执行的测试用例路径 创建readConfig.py #!/usr/bin/env python # -*- coding:utf-8 -*- import os import codecs import configparser # 获取当前py文件路径的上一层 localDir = os.path.split(os.path.realpath(__file__))[0] # 拼接成config.ini完整路径 configPath = os.path.join(localDir, 'config.ini') class ReadConfig: """读取config.ini文件""" def __init__(self): # 打开文件 config_file = open(configPath, encoding='UTF-8') data = config_file.read() # 判断是否带BOM文件 if data[:3] == codecs.BOM_UTF8: data = data[3:] file = codecs.open(configPath, 'w', encoding='UTF-8') # 改写文件 file.write(data) file.close() config_file.close() # 创建configparser实例 self.cf = configparser.ConfigParser() # 读取配置文件 self.cf.read(configPath, encoding='UTF-8') # 获取配置文件email的参数 def get_email(self, name): return self.cf.get('EMAIL', name) # 获取配置文件http的参数 def get_http(self, name): return self.cf.get('HTTP', name) # 获取配置文件database的参数 def get_database(self, name): return self.cf.get('DATABASE', name) # 获取配置文件database_case的参数 def get_database_case(self, name): return self.cf.get('DATABASE_CASE', name) 2. 创建公用类common文件夹下(以下几个py文件我就不分先后顺序介绍了): configDB.py 操作数据库 #!/usr/bin/env python # -*- coding:utf-8 -*- import MySQLdb # python3安装命令:pip3 install Mysqlclien from readConfig import ReadConfig from common.myLog import MyLog myConfig = ReadConfig() class MyDB: """数据库""" global host, name, pwd, port, database, config # 获取配置文件中的参数 host = myConfig.get_database("host") name = myConfig.get_database("name") pwd = myConfig.get_database("pwd") port = myConfig.get_database("port") database = myConfig.get_database("database") config = { 'host': str(host), 'user': name, 'password': pwd, 'port': int(port), 'db': database, 'charset': 'utf8' } def __init__(self): self.logger = MyLog.get_log().logger self.db = None self.cursor = None def connect_db(self): """连接数据库""" try: self.db = MySQLdb.connect(**config) self.cursor = self.db.cursor() print('数据库已连接成功') except ConnectionError as e: self.logger.error(str(e)) def execute_sql(self, sql, params): """执行sql并返回操作游标""" try: self.connect_db() self.cursor.execute(sql, params) self.db.commit() return self.cursor except RuntimeError as e: self.db.rollback() def get_all(self, cursor): """获取全部数据""" value = cursor.fetchall() return value def get_one(self, cursor): """获取第一条数据""" value = cursor.fetchone() return value def close_db(self): """关闭数据库""" self.db.close() print('数据库已关闭') configDBCase.py 还是操作数据库,但我这里是另一个数据库专门处理测试用例的(本文读取测试用例一共用了两种方式 1-读取数据库,2-读取xls文档) #!/usr/bin/env python # -*- coding:utf-8 -*- import MySQLdb # python3安装命令:pip3 install Mysqlclien from readConfig import ReadConfig from common.myLog import MyLog myConfig = ReadConfig() class MyDBCase: """测试用例数据库""" global host, name, pwd, port, database, config # 获取配置文件中的参数 host = myConfig.get_database_case("host") name = myConfig.get_database_case("name") pwd = myConfig.get_database_case("pwd") port = myConfig.get_database_case("port") database = myConfig.get_database_case("database") config = { 'host': str(host), 'user': name, 'password': pwd, 'port': int(port), 'db': database, 'charset': 'utf8' } def __init__(self): """初始化数据""" self.logger = MyLog.get_log().logger self.db = None self.cursor = None def connect_db(self): """连接数据库""" try: self.db = MySQLdb.connect(**config) self.cursor = self.db.cursor() print('数据库已连接成功') except ConnectionError as e: self.logger.error(str(e)) def execute_sql(self, sql, params): """执行sql并返回操作游标""" try: self.cursor.execute(sql, params) self.db.commit() return self.cursor except RuntimeError as e: self.db.rollback() def get_all(self, cursor): """返回全部数据""" value = cursor.fetchall() return value def get_one(self, cursor): """返回第一条数据""" value = cursor.fetchone() return value def close_db(self): """关闭数据库连接""" self.db.close() print('数据库已关闭') configEmail.py 处理邮件 #!/usr/bin/env python # -*- coding:utf-8 -*- import time import yagmail from readConfig import ReadConfig from common.myLog import MyLog myConfig = ReadConfig() class Email: """邮件处理""" def __init__(self): global smtp, sender, recerives, code, title, content # 读取配置文件中的数据 smtp = myConfig.get_email('smtp') sender = myConfig.get_email('sender') recerives = myConfig.get_email('recerives') code = myConfig.get_email('code') title = myConfig.get_email('subject') content = myConfig.get_email('content') # 获取当前时间 date = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()) self.subject = title + date self.log = MyLog.get_log() self.logger = self.log.logger pass def send_email(self, accessory): """发送邮件""" # 参数:发送者,邮件授权码(不是密码哦,具体参考https://baijiahao.baidu.com/s?id=1552315463915496&wfr=spider&for=pc),邮箱服务器地址 yag = yagmail.SMTP(user=sender, password=code, host=smtp) # 发送邮件,参数:接收者,标题,内容,附件 yag.send(recerives, self.subject, content, accessory) # 关闭 yag.close() configHttp.py 请求接口 #!/usr/bin/env python # -*- coding:utf-8 -*- import requests import json # 引入写好的几个类 from readConfig import ReadConfig from common.myLog import MyLog # 实例化ReadConfig类 myConfig = ReadConfig() class ConfigHttp: """请求接口类""" # 全局变量token token = {} def __init__(self): """初始化""" global timeout timeout = myConfig.get_http('timeout') self.logger = MyLog.get_log().logger self.url = '' self.headers = {} self.params = {} self.data = {} self.files = {} pass def set_headers(self, headers): """设置参数""" self.headers = headers def set_params(self, params): self.params = params def set_data(self, data): self.data = data def set_files(self, files): self.files = files def set_url(self, url): self.url = url def get_requests(self): try: return requests.get(url=myConfig.get_http('url') + self.url, params=self.params, headers=self.headers, timeout=float(timeout)) except TimeoutError: self.logger.error('请求超时') return None def post_requests(self): """post请求方式""" try: return requests.post(url=myConfig.get_http('url') + self.url, json=self.data, files=self.files, headers=self.headers, timeout=float(timeout)) except TimeoutError: self.logger.error('请求超时') return None myLog.py 处理日志 #!/usr/bin/env python # -*- coding:utf-8 -*- import os import time import logging import threading import readConfig logPath = '' resultPath = '' localDir = '' class Log: """日志""" def __init__(self): """初始化""" global logPath, resultPath, localDir # 获取readConfig.py中的localDir参数 localDir = readConfig.localDir # 拼接result文件夹完整路径 resultPath = os.path.join(localDir, "result") # 判断文件是否存在,否则创建 if not os.path.exists(resultPath): os.mkdir(resultPath) # 创建日志文件 date = time.strftime('%Y%m%d', time.localtime()) logPath = os.path.join(resultPath, date) if not os.path.exists(logPath): os.mkdir(logPath) # 创建logger实例 self.logger = logging.getLogger() # 定义logger级别 self.logger.setLevel(logging.INFO) # 自定义Handle日志规则,如果是logging.StreamHandler则是输出到控制台,FileHandle则是输出到文件 handler = logging.FileHandler(os.path.join(logPath, 'output.log')) # 创建日志格式实例:%(asctime)s:打印日志的时间, %(levelname)s:打印日志级别的名称, %(message)s:打印日志信息 formater = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') # handler自定义格式 handler.setFormatter(formater) # logger日志加载handle实例 self.logger.addHandler(handler) pass class MyLog: log = None # 创建线程锁 threadLock = threading.Lock() def __init__(self): pass # 申明静态方法,无需实例化 @staticmethod def get_log(): if MyLog.log is None: # 开启锁 MyLog.threadLock.acquire() # 调用Log类 MyLog.log = Log() # 释放锁 MyLog.threadLock.release() return MyLog.log pass readCase.py 读取xml、xls文档 #!/usr/bin/env python # -*- coding:utf-8 -*- import os from xlrd import open_workbook # 下载参考:https://blog.csdn.net/ZMJ_566/article/details/89360918 from common.configHttp import ConfigHttp from common.myLog import MyLog import readConfig from xml.etree import ElementTree localConfigHttp = ConfigHttp() logger = MyLog.get_log().logger database = {} class ReadCase: """读取文档""" def get_xls(self, xls_name, sheet_name): """读取xls文档。xls_name:文档名称,sheet_name:工作表sheet名称 """ cls = [] # 拼接地址 xlsPath = os.path.join(readConfig.localDir, 'testFile', xls_name) # 打开文档 file = open_workbook(xlsPath) # 根据工作表名获取工作表信息 sheet = file.sheet_by_name(sheet_name) # 获取工作表总行数 nrows = sheet.nrows # 遍历并忽略第一行表头 for i in range(1, nrows): cls.append(sheet.row_values(i)) return cls def get_xml(self): """读取xml文档""" if len(database) == 0: # 拼接地址 sql_path = os.path.join(readConfig.localDir, 'testFile', 'sql.xml') # 解析xml tree = ElementTree.parse(sql_path) # 遍历所有database标签 for db in tree.findall("database"): # 获取标签的name属性 db_name = db.get("name") table = {} for tb in db: table_name = tb.get("name") sql = {} for data in tb: sql_id = data.get("id") sql[sql_id] = data.text table[table_name] = sql database[db_name] = table def get_sql(self, database_name, table_name, sql_id): """根据数据库名、表名、sql_id获取sql""" try: self.get_xml() sql = database.get(database_name).get(table_name).get(sql_id) return sql except RuntimeError as e: logger.info('%s:获取的数据库名或表名或sql_id不存在', e) 3. 测试用例数据库表: CREATE TABLE `t_case` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) NOT NULL COMMENT '用例名称', `method` varchar(255) NOT NULL COMMENT '方法', `url` varchar(255) NOT NULL COMMENT '路径', `param` varchar(255) NOT NULL COMMENT '参数', `code` int(255) DEFAULT NULL COMMENT '编码', `message` varchar(255) DEFAULT NULL COMMENT '消息', PRIMARY KEY (`id`) ) ENGINE=MyISAM AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4;表内容: runAll.py #!/usr/bin/env python # -*- coding:utf-8 -*- import unittest import os import time from HTMLTestRunner import HTMLTestRunner # 这里参考https://blog.csdn.net/notHavaBug/article/details/115457523?spm=1001.2014.3001.5502 import readConfig from common.myLog import MyLog from common.configEmail import Email logger = MyLog.get_log().logger myConfig = readConfig.ReadConfig() class RunAll: def __init__(self): self.caseList = [] pass def set_case_list(self): # 读取caselist.txt文档获取用例路径 caselist_file = os.path.join(readConfig.localDir, 'caselist.txt') fb = open(caselist_file, 'r') for value in fb.readlines(): data = str(value) # 忽略被注释掉的用例路径 if data != '' and not data.startswith('#'): self.caseList.append(data.replace('\n', '')) def set_case_suit(self): self.set_case_list() # 创建测试套件 test_suite = unittest.TestSuite() suite_model = [] for case in self.caseList: # 根据'/'分割文件夹、文件名 case_url = case.split('/') # 拼接路径 case_file = os.path.join(os.getcwd(), 'testCase', case_url[0]) # 划重点!这个地方不要用unittest.defaultTestLoader.discover会因为初始化问题只能执行一次,因为这个我真的找了一整天的原因!!! test_loader = unittest.TestLoader() # 自动执行所有测试用例 # pattern支持模糊匹配,例如login下有很多测试用例且都是以test开头,不想一个个都写在caselist.txt中,可以在caselist.txt中只写文件夹名,这里直接写pattern='test*.py' discover = test_loader.discover(case_file, pattern=case_url[1] + '.py', top_level_dir=None) suite_model.append(discover) if len(suite_model) > 0: for suite in suite_model: for test_name in suite: test_suite.addTest(test_name) else: return None return test_suite def run(self): flag = 0 try: suit = self.set_case_suit() if suit is not None: logger.info('*****************开始测试***************') date = time.strftime('%Y%m%d-%H%M%S', time.localtime()) # 拼接html报告存放路径 resultPath = os.path.join(readConfig.localDir, 'result', date + '_test.html') fp = open(resultPath, 'wb') # 生成html测试报告 runner = HTMLTestRunner(stream=fp, title='测试报告', description=date + '测试用例执行情况') runner.run(suit) flag = 1 else: logger.info('没有测试用例') except Exception as e: print(e) logger.error(str(e)) finally: logger.info('*****************测试结束***************') on_off = myConfig.get_email('on_off') if int(on_off) == 1 and flag == 1: # 调用写好email类发送邮件 Email().send_email(resultPath) logger.info('已发送邮件') if __name__ == '__main__': RunAll().run() 结果展示 日志文件![]() ![]() ![]()
如有不对之处欢迎指出 本文参考文献:https://www.cnblogs.com/wangxiaoqun/p/6924797.html |
CopyRight 2018-2019 实验室设备网 版权所有 |