【Pyhton】【接口自动化】利用pandas进行数据校验 您所在的位置:网站首页 自动化数据报表怎么做的 【Pyhton】【接口自动化】利用pandas进行数据校验

【Pyhton】【接口自动化】利用pandas进行数据校验

#【Pyhton】【接口自动化】利用pandas进行数据校验| 来源: 网络整理| 查看: 265

20211226更新:补上具体实现(这里附上的代码是以前在家写的代码,拿到公司用的时候发现代码里有个问题 进行了修复,然后加了个ifSortByColumn(按列名对列排序)和ifTransToSameTypeToCompare(转换为相同数据类型对比);因为公司的代码不运行带出,也不记得怎么改的了;如果要使用这个,遇到了可以自己改改)

做接口自动化,重要的一点就是检查接口返回的数据是否正确,及对比接口返回的数据与我们从数据库中查询到的数据是否相符;

初学者可能会想到将将对应数据取出逐一对比,但这样代码较多且可能会出现些奇奇怪怪的问题;

将数据处理成dataframe,使用pandas的assert_frame_equal可以快速 优雅的进行数据校验

# coding:utf-8 # author:CSDN-在墙角蹲着画圈圈 import jsonpath import copy from robot.api.deco import keyword from pprint import pprint def getDataByJsonpath(data,path=None,return_copy=False): """ 根据jsonpath或字段名查找数据 @param data: 待处理数据 @param path: jsonpath或字段名 @param return_copy: 是否返回副本,True/False;True-返回的list中是原数据的引用;False-返回的是原数据的副本 @return:list """ if not path: pass elif path.startswith('$'): data = jsonpath.jsonpath(data,path) else: data = jsonpath.jsonpath(data,'$..'+path) if return_copy: data = copy.deepcopy(data) return data @keyword(name='铺平报文结构') def spreadData(data,path,fieldNm,deal_on_copy=False): """ 铺平报文结构 @param data:原数据 @param path:待处理数据jsonpath(或字段名) @param deal_on_copy:是否在副本上处理,True/False;False-不会影响原数据 @return:处理后的数据 示例: example = {‘data’: [ {'a':{'num':12}}, {'b':{'num':23}} ] } spreadData(example,'data','name') ==> return: [ {'name':'a','num':12}, {'name':'b','num':23} ] """ data = getDataByJsonpath(data,path,deal_on_copy) if data: data = data[0] for idx,item in enumerate(data): for k,v in item.items(): v[fieldNm] = k data[idx] = v return data @keyword(name="jsonKey大小写转换") def keysToUpperOrLower(data,to='U'): """ 转换json的key为大写或者小写 :param data: json数据 :param to: U-将key转为大写,L-将key转为小写 :return: 处理后的json数据 """ to = to.upper() if to == 'U': action = 'key.upper()' elif to == 'L': action = 'key.lower()' else: raise ValueError('参数to入参错误,需传入U或L') if isinstance(data,dict): return {eval(action):keysToUpperOrLower(value,to) for key,value in data.items()} elif isinstance(data,list): return [keysToUpperOrLower(cell,to) for cell in data] else: return data @keyword(name="jsonKey替换") def replaceJsonKey(data,old:str,new:str): """ 替换json的key或key中特定字符为新key/字符 :param data:json数据 :param old:旧key :param new:新key :return:处理后的json数据 """ if isinstance(data,dict): return {key.replace(old,new):replaceJsonKey(value) for key,value in data.items()} elif isinstance(data,list): return [replaceJsonKey(cell,old,new) for cell in data] else: return data @keyword(name="json数据替换") def replaceJsonDataValue(data, old, new): """ json数据替换 :param data:json数据 :param old: 待替换value :param new: 替换后的目标值 :return: 处理后的json数据 """ if not isinstance(old, list): old =[old,] if isinstance(data,dict): data_tmp1 = {key:value for key, value in data.items() if value in old} data.update(data_tmp1) data = {key: replaceJsonDataValue(value, old, new) for key, value in data.items()} return data elif isinstance(data,list): return [replaceJsonDataValue(cell, old, new) for cell in data] else: if data in old: return new else: return data @keyword(name="json数据排序") def sortJsonData(data,reverse=False,key=None): """ json数据中的key,list数据进行排序 :param data: json数据 :param reverse: 是否倒序排序,默认值False :param key:list排序表达式 :return:处理后的数据 """ if isinstance(data,dict): keys = list(data.keys()) keys.sort(reverse=reverse) return {dataKey:sortJsonData(data[dataKey],reverse=reverse,key=key) for dataKey in keys} elif isinstance(data,list): try: if isinstance(key,str): data.sort(key=eval(key), reverse=reverse) else: data.sort(key=key,reverse=reverse) except (TypeError,KeyError): pass return data else: return data @keyword(name="jsonKey剔除") def dropJsonKey(data,keys:list=[],dropKeyByValueExpr:str=None): """ jsonKey剔除 :param data: json数据 :param keys: 需要剔除的key,list :param dropKeyByValue: 对value满足条件的key进行剔除,例如:传入value=1,将剔除value=1的key(适用于value为空 接口不返回该key的情况,对数据库查出数据进行相应处理) :return:处理后的数据吗 """ if dropKeyByValueExpr and(not isinstance(dropKeyByValueExpr,str) or not dropKeyByValueExpr.startswith("value")): raise ValueError("dropKeyByValueExpr入参需为value值判断表达式") if isinstance(data,dict): data = {key: dropJsonKey(value, keys, dropKeyByValueExpr) for key,value in data.items() if key not in keys} if dropKeyByValueExpr: items = [] for key,value in data.items(): try: if not eval(dropKeyByValueExpr.replace("==","=").replace("=","==")): items.append((key,value)) except (TypeError,KeyError): items.append((key, value)) data = dict(items) return data elif isinstance(data,list): return [dropJsonKey(cell, keys, dropKeyByValueExpr) for cell in data] else: return data def getListData(data,index,default=None): try: return data[index] except: return default @keyword(name='根据key替换value') def replaceJosnDataByKey(data,key,value): if isinstance(data,dict): for item in data.items(): if item[0] == key: data[item[0]] = value else: data[item[0]] = replaceJosnDataByKey(data[item[0]],key,value) return data elif isinstance(data,list): return [replaceJosnDataByKey(item,key,value) for item in data] else: return data @keyword(name='替换报文参数') def replaceJonsDatas(data,replace:dict=None): """ :param data: 原始报文 :param replace: 待替换的参数dict :return:替换参数值后的报文 """ if not replace: return data else: for item in replace.items(): replaceJosnDataByKey(data,item[0],item[1]) return data @keyword(name='合并json') def mergeJson(json1,json2,**kwargs): """合并json报文""" return {**json1,**json2,**kwargs} if __name__ == '__main__': # data = {'key1_1': 11, # 'key1_2': 12, # 'key1_3': [131, 132, 133], # 'key1_4': {'key2_1': 21, # 'key2_2': [211, 212, 213], # 'key2_3': {'key3_1': 31, 'key3_2': [311, 312, 313]}}, # 'key1_5': [{'key2_4': 24, 'key2_5': 25}, {'key2_4': 26, 'key2_5': 27}], # 'key1_6':[{'华为':{''}},{}] # } example = {'data': [ {'a': {'num': 12}}, {'b': {'num': 23}} ] } data = spreadData(example,'data','name',deal_on_copy=False) print(data) print(example) # coding:utf-8 # author:CSDN-在墙角蹲着画圈圈 from robot.api.deco import keyword from dataDeal import * import pysnooper import pandas as pd from pandas.testing import assert_frame_equal from numpy import dtype from pysnooper import snoop @snoop(max_variable_length=3000) @keyword(name="校验数据一致") def assertDataEquals(data1, data2, keys1=None, keys2=None, exceptKeys=None, transKeyToUpperOrLower=None, replaceKey:list=None, sortData:list=None, compareAsSameType=True): """ 校验数据是否一致(支持单个字符串/数值/list/dict数据对比),如果数据不一致将会抛出AssertionError :param data1:待比较数据1 :param data2:待比较数据2 :param keys1:待比较数据1中需要对比的key,只筛选第一层数据 :param keys2:待比较数据2中需要对比的key,只筛选第一层数据 :param exceptKeys:不参与对比的key,完全遍历过滤 :param transKeyToUpperOrLower:json key大写或小写,默认值None-不处理,U-大写,L-小写 :param replaceKey:对json key进行replace处理,默认None-不出来;示例:传入["-","_"]会将所有key中的-替换为_ :param sortData:对数据进行排序,默认None-不处理:示例传入[True,'lambda x:x["No"]'] 表示对json中key依照reverse=True排序,并对list的数据依照reverse=True,key=lambda x:x["No"]进行排序 :param compareAsSameType:是否将数据转为相同类型进行比较,默认值True(开启此功能,1.0,1,“1”,"1.00"会视作相同) :return: 无返回值 """ if type(data1) == type(data2) == str: data1 = [data1,] data2 = [data2,] if replaceKey or transKeyToUpperOrLower: if replaceKey: old, new = replaceKey data1 = replaceKey(data1, old, new) data2 = replaceKey(data2, old, new) exceptKeys = [cell.replace(old,new) for cell in exceptKeys] else: data1 = keysToUpperOrLower(data1, transKeyToUpperOrLower) data2 = keysToUpperOrLower(data2, transKeyToUpperOrLower) if transKeyToUpperOrLower.upper() == 'U': exceptKeys = [cell.upper() for cell in exceptKeys] else: exceptKeys = [cell.lower() for cell in exceptKeys] return assertDataEquals(data1, data2, keys1, keys2, exceptKeys, None, None) if exceptKeys: data1 = dropJsonKey(data1,exceptKeys) data2 = dropJsonKey(data2,exceptKeys) if sortData: reverse = getListData(sortData,0,None) key = getListData(sortData,1, None) data1 = sortJsonData(data1,reverse,key) data2 = sortJsonData(data2, reverse, key) df1 = pd.DataFrame(data1,keys1) df2 = pd.DataFrame(data2,keys2) if len(df1.columns) != len(df2.columns): raise AssertionError("待对比的两组数据键不一致:",df1.columns,df2.columns) if not compareAsSameType: assert_frame_equal(df1,df2) else: try: assert_frame_equal(df1, df2) except: dtype1 = df1.dtypes dtype2 = df2.dtypes dtypeStr1 = {key: str(value) for key, value in dtype1.items()} dtypeStr2 = {key: str(value) for key, value in dtype2.items()} dtypeSort = { "float64": 1, "int64": 2, "object": 3} dtypes = {} try: for key, value in dtypeStr1.items(): if dtypeSort.get(dtypeStr1[key], 0)


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

      专题文章
        CopyRight 2018-2019 实验室设备网 版权所有