您的位置:首页 > 汽车 > 时评 > 泰州智搜网络科技有限公司_永久免费安卓代理ip_网络营销和传统营销的区别_建一个网站大概需要多少钱

泰州智搜网络科技有限公司_永久免费安卓代理ip_网络营销和传统营销的区别_建一个网站大概需要多少钱

2024/11/16 16:50:13 来源:https://blog.csdn.net/chengxuyuznguoke/article/details/143647522  浏览:    关键词:泰州智搜网络科技有限公司_永久免费安卓代理ip_网络营销和传统营销的区别_建一个网站大概需要多少钱
泰州智搜网络科技有限公司_永久免费安卓代理ip_网络营销和传统营销的区别_建一个网站大概需要多少钱

介绍分享一个接口自动化框架搭建方法 (pytest+request+allure),这个方案是由 xpcs  同学在TesterHome社区网站的分享。

写在前面

去年11月被裁,到现在还没上岸,gap 半年了。上岸无望,专业技能不能落下,花了两三天时间,把之前工作中搭建使用的接口自动化框架,重写了一套。

楼主代码菜鸡,代码可能比较 low - -,希望通过本次分享,给社区里想写接口自动化的同学一些借鉴,也希望社区里的大神多给一些优化建议,大家互帮互助,共同进步~

  • 框架基于 python 语言,框架使用 pytest,报告使用 allure

  • 支持多环境运行,通过命令行传参区分

  • 支持多进程跑测,用例需独立无依赖,conftest.py 中包含多进程下只运行一次的 fileLock fixture

  • 支持数据库连接单例,一个库在一个进程下只会建立一次连接

  • 支持 mysql、redis 操作

  • 支持 get、post、put、delete 请求方法,请求是通过用例的请求头 Content-Type 来区分,是使用 params、data 还是 json 传参

  • 支持参数化数据驱动,用参数化参数字典,去更新通用参数字典,更新后发起请求

以下使用 windows 环境

conda 配置和新建工程:

安装conda
https://www.anaconda.com/download/success

新建工程,在 pycharm 中新建 conda 虚拟环境

图片

安装 allure 报告

https://repo.maven.apache.org/maven2/io/qameta/allure/allure-commandline/2.29.0/

图片

解压并配置环境变量
D:\allure\allure-2.29.0

图片

图片

cmd 命令行,验证 allure 安装

图片

设置 pycharm 文件编码 UTF-8

图片

依赖安装

pycharm 命令行运行
激活虚拟环境
conda activate pytest_api_auto
安装 python3.8
conda install python=3.8
安装依赖包
pip install requests
pip install jsonpath
pip install pytest
pip install allure-pytest
pip install pytest-sugar
pip install pytest-xdist
pip install pytest-assume
pip install pymysql
pip install redis
pip install faker
pip install filelock

目录划分

图片

以下是源码部分

封装log日志工具类

 
  1. # common/log_util.pyimport loggingimport os# create loggerlog = logging.getLogger("pytest_api_auto")log.setLevel(logging.INFO)# create file handler

  2. # mode 默认为a追加模式,如果修改为w为覆盖模式,多进程运行会出现日志缺失和错乱

  3. # 获取项目根目录拼接,日志会存在工程根目录pytest.log 每次运行追加写入fh = logging.FileHandler(os.path.join(os.path.dirname(os.path.dirname(__file__)), "pytest.log"),

  4. mode='a', encoding='UTF-8')fh.setLevel(logging.INFO)# create stream handlersh = logging.StreamHandler(stream=None)# create formatterfmt = "%(asctime)s - %(filename)s - %(funcName)s - %(lineno)d - %(levelname)s - %(message)s"formatter = logging.Formatter(fmt)# add handler and formatter to loggerfh.setFormatter(formatter)sh.setFormatter(formatter)log.addHandler(fh)log.addHandler(sh)

封装环境配置

 
  1. config/env.pyfrom common.log_util import logclass ENV:

  2. # 环境信息:test 测试 prod 准生产 # 从pytest命令行获取 info = None# 测试环境服务域名配置class UrlTestConfig:

  3. api_backend = "http://api_backend.cn:8899"# 准生产环境服务域名配置class UrlProdConfig:

  4. api_backend = "http://api_backend.cn:8899"def get_url(server_name):

  5. if ENV.info == "test":

  6. url = getattr(UrlTestConfig, server_name)

  7. log.info(f"测试环境获取服务域名 - {server_name} : {url}")

  8. return url

  9. elif ENV.info == "prod":

  10. url = getattr(UrlProdConfig, server_name)

  11. log.info(f"准生产环境获取服务域名 - {server_name} : {url}")

  12. return url

  13. else:

  14. raise Exception("--env 环境信息有误")

封装 mysql 操作工具类

 
  1. # common/mysql_util.py

  2. # 装饰器,同一个mysql数据库只建立一次连接import pymysqlfrom time import sleepfrom common.log_util import log# 装饰器,同一个mysql数据库只建立一次连接def decorate_single(cls):

  3. connect_list = {}

  4. def wrapper(*args, **kwargs):

  5. nonlocal connect_list

  6. db_name = args[0]["db"]

  7. if db_name not in connect_list:

  8. connect_list[db_name] = cls(*args, **kwargs)

  9. log.info(f"建立mysql连接并返回 - {db_name}")

  10. else:

  11. log.info(f"mysql连接已建立,直接返回 - {db_name}")

  12. return connect_list[db_name]

  13. return wrapper@decorate_singleclass MySql:

  14. def __init__(self, db_config: dict):

  15. """

  16. :params: db_config 数据库配置 类型为字典

  17. """

  18. # 数据库配置 # autocommit: True 选项很关键,如果不设置,新增数据无法查出 # mysql默认数据引擎是innodb 默认数据隔离级别重复读,如果事务不提交,那么每次查询,查询都是同一块数据快照 self.conn = None

  19. while True:

  20. try:

  21. self.conn = pymysql.connect(**db_config)

  22. break

  23. # 数据库连接,偶尔会连接不上 # 报错 pymysql.err.OperationalError: (2013, 'Lost connection to MySQL server during query') # 解决办法,就是重新连接 except pymysql.err.OperationalError:

  24. log.warning("连接失败,可能环境不稳定,重新连接!")

  25. sleep(1)

  26. except Exception as e:

  27. log.warning("获取mysql连接失败!请检查数据库配置或网络连接")

  28. raise e

  29. def fetchone(self, sql_str: str):

  30. """

  31. :params: sql_str 数据库sql

  32. :return: 返回查询结果的一条记录,类型是字典; 若未查询到,则返回None

  33. """

  34. try:

  35. with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:

  36. log.info(f"执行sql: {sql_str}")

  37. cursor.execute(sql_str)

  38. data = cursor.fetchone()

  39. log.info(f"sql执行结果: {data}")

  40. return data

  41. except Exception as e:

  42. log.warning("执行sql失败!")

  43. raise e

  44. def fetchall(self, sql_str: str):

  45. """

  46. :params: sql_str 数据库sql

  47. :return: 返回查询结果的全部记录,类型是列表,列表元素为字典

  48. """

  49. try:

  50. with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:

  51. log.info(f"执行sql: {sql_str}")

  52. cursor.execute(sql_str)

  53. data = cursor.fetchall()

  54. log.info(f"sql执行结果: {data}")

  55. return data

  56. except Exception as e:

  57. log.warning("执行sql失败!")

  58. raise e

  59. def execute_dml(self, sql_str):

  60. """

  61. function: 执行insert、update、delete

  62. :param sql_str 数据库sql

  63. :return: 无返回

  64. """

  65. try:

  66. with self.conn.cursor(pymysql.cursors.DictCursor) as cursor:

  67. log.info(f"执行sql: {sql_str}")

  68. data = cursor.execute(sql_str)

  69. # 提交操作,我们配置连接是自动提交,所以下面提交步骤也可省略 self.conn.commit()

  70. log.info(f"sql执行结果: {data}")

  71. except Exception as e:

  72. log.warning("执行sql失败!")

  73. raise e

  74. def close(self):

  75. """

  76. function:关闭数据库连接

  77. params: conn 数据库连接

  78. """

  79. self.conn.close()

封装 mysql 配置和连接获取

 
  1. # config/mysql.pyfrom common.mysql_util import MySqlfrom config.env import ENVfrom common.log_util import log# 数据库连接配置class MysqlTestConfig:

  2. """Mysql测试环境配置"""

  3. api_auto = {'host': 'localhost', 'port': 3306,

  4. 'db': 'api_auto', 'user': 'root',

  5. 'password': 'root', 'autocommit': True

  6. }class MysqlProdConfig:

  7. """Mysql准生产环境配置"""

  8. api_auto = {'host': 'localhost', 'port': 3306,

  9. 'db': 'api_auto', 'user': 'root',

  10. 'password': 'root', 'autocommit': True

  11. }def get_mysql_conn(db_name):

  12. if ENV.info == "test":

  13. log.info("测试环境建立mysql连接 - " + db_name)

  14. return MySql(getattr(MysqlTestConfig, db_name))

  15. elif ENV.info == "prod":

  16. log.info("准生产环境建立mysql连接 - " + db_name)

  17. return MySql(getattr(MysqlProdConfig, db_name))

  18. else:

  19. raise Exception("--env 环境信息有误")

封装 redis 工具类

 
  1. # common/redis_util.pyimport redisfrom common.log_util import log# 装饰器,同一个redis只建立一次连接def decorate_single(cls):

  2. connect_list = {}

  3. def wrapper(*args, **kwargs):

  4. nonlocal connect_list

  5. host = args[0]["host"]

  6. if host not in connect_list:

  7. connect_list[host] = cls(*args, **kwargs)

  8. log.info(f"建立redis连接并返回 - {host}")

  9. else:

  10. log.info(f"redis连接已建立,直接返回 - {host}")

  11. return connect_list[host]

  12. return wrapper@decorate_singleclass Redis:

  13. def __init__(self, db_config):

  14. """

  15. :params: db_config 数据库配置 类型为字典

  16. """

  17. self.pool = redis.ConnectionPool(**db_config)

  18. self.rs = redis.Redis(connection_pool=self.pool)

  19. def del_key(self, key):

  20. """

  21. :param key: redis key str字符类型

  22. :return: 删除成功返回 True 否则 False

  23. """

  24. log.info(f"redis 删除key {key}")

  25. if self.rs.delete(key) == 1:

  26. log.info(f"key {key} 删除成功")

  27. return True

  28. else:

  29. log.warning(f"key: {key} 不存在!")

  30. return False

  31. def del_keys(self, keys_pattern):

  32. """

  33. :param keys_pattern: key通配符 str字符类型 ex: *name*

  34. :return:删除成功返回 True 否则 False

  35. """

  36. log.info(f"redis 删除keys 通配符 {keys_pattern}")

  37. keys = self.rs.keys(keys_pattern)

  38. if keys:

  39. log.info(f"redis 删除keys {keys}")

  40. for k in keys:

  41. self.rs.delete(k)

  42. log.info(f"keys {keys} 删除成功")

  43. return True

  44. else:

  45. log.warning("通配符未匹配到key!")

  46. return False

  47. def set(self, key, value, ex=8 * 60 * 60):

  48. """

  49. 操作str类型

  50. :param key: redis key str字符类型

  51. :param value: str字符类型

  52. :param ex: 数据超时时间,默认8小时

  53. return: 写入成功返回 True

  54. """

  55. log.info(f"redis str类型 数据写入 key: {key} value: {value}")

  56. return self.rs.set(key, value, ex=ex)

  57. def get(self, key):

  58. """

  59. 操作str类型

  60. :param key: redis key str字符类型

  61. :return: 获取到返回str字符类型 # 未获取到返回 None

  62. """

  63. data = self.rs.get(key)

  64. log.info(f"redis str类型 数据获取 key: {key} value: {data}")

  65. return data

  66. def lrange(self, key):

  67. """

  68. 操作list类型

  69. :param key: redis key str字符类型

  70. return: 获取到返回list列表类型 # 未获取到返回空列表 []

  71. """

  72. data = self.rs.lrange(key, 0, -1)

  73. log.info(f"redis list类型 数据获取 key: {key} values: {data}")

  74. return data

  75. def smembers(self, key):

  76. """

  77. 操作 set 集合

  78. :param key: redis key str字符类型

  79. return: 获取到返回set集合类型 # 未获取到返回空集合 set()

  80. """

  81. data = self.rs.smembers(key)

  82. log.info(f"redis set类型 数据获取 key: {key} values: {data}")

  83. return data

  84. def zrange(self, key):

  85. """

  86. 操作 zset 有序集合

  87. :param key: redis key str字符类型

  88. return: 获取到返回list列表类型 # 未获取到返回空列表 []

  89. """

  90. data = self.rs.zrange(key, 0, -1)

  91. log.info(f"redis zset类型 数据获取 key: {key} values: {data}")

  92. return data

  93. # hash 操作 hset hget 后续可扩展

  94. def close(self):

  95. """

  96. function:关闭数据库连接

  97. params: rs Redis对象

  98. """

  99. self.rs.close()

封装 redis 配置和连接获取

 
  1. # config/redis.pyfrom common.redis_util import Redisfrom config.env import ENVfrom common.log_util import logclass RedisTestConfig:

  2. api_backend = {'host': 'api_backend.cn', 'password': 'redis123',

  3. 'port': 6379, 'db': 0, 'decode_responses': True}class RedisProdConfig:

  4. api_backend = {'host': 'api_backend.cn', 'password': 'redis123',

  5. 'port': 6379, 'db': 0, 'decode_responses': True}def get_redis_conn(name):

  6. if ENV.info == "test":

  7. log.info("测试环境建立redis连接 - " + name)

  8. return Redis(getattr(RedisTestConfig, name))

  9. elif ENV.info == "prod":

  10. log.info("准生产环境建立redis连接 - " + name)

  11. return Redis(getattr(RedisProdConfig, name))

  12. else:

  13. raise Exception("--env 环境信息有误")

封装 requests 工具类

 
  1. # common/requests_util.pyimport requestsfrom common.log_util import logdef send_request(url, method, data, headers, **kwargs):

  2. """

  3. :param url: 请求域名 类型 str ex: http://xxx.com/path

  4. :param method: 请求方法 类型 str 暂时支持 get、post、put、delete

  5. :param data: 请求数据,类型 dict、list、str

  6. :param headers: 请求头,类型 dict

  7. :param kwargs: 扩展支持 files 上传文件、proxy 代理等

  8. :return:

  9. """

  10. if not url.startswith("http://") and not url.startswith("https://"):

  11. raise Exception("请求url缺少协议名")

  12. if method.lower() not in ("get", "post", "put", "delete"):

  13. raise Exception(f"暂不支持请求方法 - {method} - 可后续扩展")

  14. log.info("请求参数:")

  15. log.info(f"url: {url}")

  16. log.info(f"method: {method}")

  17. log.info(f"data: {data}")

  18. log.info(f"headers: {headers}")

  19. log.info(f"kwargs: {kwargs}")

  20. try:

  21. if "Content-Type" in headers.keys():

  22. # headers 包含传参类型 if headers["Content-Type"] in ("application/x-www-form-urlencoded", "multipart/form-data"):

  23. res = requests.request(url=url, method=method, data=data, headers=headers, timeout=30, **kwargs)

  24. elif headers["Content-Type"] == "application/json":

  25. res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)

  26. else: # 若非上面三种类型,默认使用json传参 text/html, text/plain等,可后续扩展验证 res = requests.request(url=url, method=method, json=data, headers=headers, timeout=30, **kwargs)

  27. else:

  28. # 请求头没指定传参类型Content-Type,则使用params传参,即在url中传参,如get请求 res = requests.request(url=url, method=method, params=data, headers=headers, timeout=30, **kwargs)

  29. except Exception as e:

  30. log.warning("请求发生异常!!!")

  31. raise e

  32. if res.status_code == 200:

  33. log.info("请求成功")

  34. log.info("响应参数:")

  35. log.info(f"{res.text}")

  36. else:

  37. log.warning(f"请求失败!!! 返回码不为200, 状态码为: {res.status_code}")

  38. log.warning(f"响应参数:")

  39. log.warning(f"text: {res.text}")

  40. log.warning(f"raw: {res.raw}")

  41. raise Exception("返回码不为200")

  42. try:

  43. # 返回为字典类型 return res.json()

  44. except requests.exceptions.JSONDecodeError:

  45. log.warning("响应参数不为json,返回响应 response对象")

  46. return res

封装用例数据解析

 
  1. # common/data_parser.pyfrom config.env import get_urlfrom common.request_util import send_requestimport jsonpathfrom common.log_util import logdef parser(server_name, data_dict, param=None, **kwargs):

  2. """

  3. :param server_name: env.py 中的服务域名 类型str ex: api_backend

  4. :param data_dict: test_xxx.py测试用例对应data.py中的接口请求数据字典 ex: api_backend["get_student"]

  5. :param param: data.py 参数化列表中一项中的data ex: api_backend["get_student"]["param_list"][0]["data"]

  6. :**kwargs: 扩展参数

  7. :return: 请求结果,如果响应是json类型返回dict,否则返回response对象

  8. """

  9. # 获取配置中的服务器域名,拼接path url = get_url(server_name) + data_dict["path"]

  10. method = data_dict["method"]

  11. headers = data_dict["headers"]

  12. data = data_dict["data"]

  13. # 参数化后发起请求,用参数化参数更新或替代通用参数 if param:

  14. if isinstance(data, dict) and isinstance(param, dict):

  15. # 如果通用参数为字典,参数化参数也为字典,使用参数化参数更新通用参数 ex: {"xx": "xx"} data.update(param)

  16. else:

  17. # 如果通用参数是字符串、列表(元素为字符、数字、字典),直接使用参数化参数据替换通用参数 ex: ["xx", "xx"] data = param

  18. res = send_request(url, method, data, headers, **kwargs)

  19. return resdef assert_res(res_dict, expect_dict):

  20. """

  21. :param res_dict: request请求返回的结果字典,类型 dict

  22. :param expect_dict: 预期结果字典, 类型 dict

  23. """

  24. if isinstance(res_dict, dict):

  25. log.info("开始断言")

  26. log.info(f"预期结果: {expect_dict}")

  27. # 遍历预期结果的key,使用jsonpath获取请求结果的value,与预期结果value比对 for k in expect_dict.keys():

  28. res_list = jsonpath.jsonpath(res_dict, '$..' + str(k)) # 返回列表 assert expect_dict[k] in res_list

  29. log.info("断言通过")

  30. else:

  31. log.warning("请求结果不为dict字典类型,跳过断言!")

封装 faker 模拟数据

 
  1. # common/faker.pyfrom faker import Fakerfrom common.log_util import logfake = Faker("zh_CN")def get_name():

  2. name = fake.name()

  3. log.info(f"faker 生成姓名: {name}")

  4. return namedef get_phone_number():

  5. phone_number = fake.phone_number()

  6. log.info(f"faker 生成手机号: {phone_number}")

  7. return phone_numberdef get_id_card():

  8. id_card = fake.ssn()

  9. log.info(f"faker 生成身份证号: {id_card}")

  10. return id_card

pytest.ini

 
  1. [pytest]addopts = -p no:warnings -vsmarkers =

  2. multiprocess: suppurt mutl-process execute cases

全局 conftest.py

 
  1. # conftest.pyimport pytestfrom common.log_util import logfrom filelock import FileLockimport jsonfrom config.env import ENVimport osimport allure# 自定义环境信息pytest命令行def pytest_addoption(parser):

  2. parser.addoption(

  3. "--env",

  4. action="store",

  5. default="test",

  6. help="set pytest running environment ex: --env=test --env=prod"

  7. )# 从pytest命令行获取环境信息@pytest.fixture(scope="session")def get_env(request):

  8. ENV.info = request.config.getoption("--env")

  9. log.info("运行环境: " + ENV.info)

  10. return ENV.info# 终结函数,最后执行@pytest.fixture(scope="session", autouse=True)def fixture_case_end(request):

  11. def case_end():

  12. log.info("测试结束")

  13. request.addfinalizer(case_end)@pytest.fixture(scope="session", autouse=True)# fixture 嵌套先执行获取环境信息get_env

  14. # 加入 tmp_path_factory worker_id 用于多进程执行 # 多进程运行,token只获取一次def fixture_get_token(get_env, tmp_path_factory, worker_id):

  15. # 单进程执行 if worker_id == "master":

  16. # 获取token token = {"token": "xpcs"}

  17. log.info("fixture_get_token master获取token %s" % token['token'])

  18. else:

  19. # 多进程执行 root_tmp_dir = tmp_path_factory.getbasetemp().parent

  20. fn = root_tmp_dir / "data.json"

  21. # 这里with里面的语句,理解为是被加锁的,同一时间只能有一个进程访问 with FileLock(str(fn) + ".lock"):

  22. if fn.is_file():

  23. # session_fixture 获取token已执行,直接从文件中读取token token = json.loads(fn.read_text())

  24. log.info("fixture_get_token slave使用token %s" % token['token'])

  25. else:

  26. token = {"token": "xpcs"}

  27. fn.write_text(json.dumps(token))

  28. log.info("fixture_get_token slave获取token %s" % token['token'])

  29. yield token['token']

  30. # session 结束后自动执行如下 log.info("session结束")# 用例失败自动执行钩子函数@pytest.hookimpl(tryfirst=True, hookwrapper=True)def pytest_runtest_makereport(item):

  31. # 获取钩子方法的调用结果 outcome = yield

  32. rep = outcome.get_result()

  33. # 仅仅获取用例call 执行结果是失败的情况, 不包含 setup/teardown if rep.when == "call" and rep.failed:

  34. mode = "a" if os.path.exists("failures") else "w"

  35. with open("failures", mode) as f:

  36. # let's also access a fixture for the fun of it if "tmpdir" in item.fixturenames:

  37. extra = " (%s)" % item.funcargs["tmpdir"]

  38. else:

  39. extra = ""

  40. f.write(rep.nodeid + extra + "\n")

  41. with allure.step("用例运行失败,可加入信息"):

  42. allure.attach("失败内容: ----xpcs----", "失败标题", allure.attachment_type.TEXT)

测试用例

 
  1. # line_of_business/service_name_api_backend/test_api_backend.pyimport pytestfrom time import sleepimport allurefrom common.data_parser import parser, assert_resfrom config.mysql import get_mysql_connfrom common.log_util import logfrom line_of_business_name.service_name_api_backend.data import api_backendfrom common.faker_util import get_name, get_id_card, get_phone_numberfrom config.redis import get_redis_conn@allure.feature("flask后端接口测试")class TestApiBackend:

  2. @classmethod

  3. def setup_class(cls):

  4. # 获取数据库连接,执行sql测试 log.info("setup_class")

  5. # 数据库连接根据db库名单例,相同库返回同一个连接 conn = get_mysql_conn("api_auto")

  6. conn1 = get_mysql_conn("api_auto")

  7. conn.execute_dml("insert into test_xdist(msg) values ('%s')" % "class_setup-数据库写入测试")

  8. conn1.fetchone("select * from test_xdist limit 1")

  9. @classmethod

  10. def teardown_class(cls):

  11. log.info("steup_teardowm")

  12. # 获取redis连接,执行命令测试 # redis连接根据host单例,相同host返回同一个连接 rs = get_redis_conn("api_backend")

  13. rs1 = get_redis_conn("api_backend")

  14. rs.set("name", "xp")

  15. rs1.get("name")

  16. @allure.story("测试故事1")

  17. @pytest.mark.xfail(reason='预期失败用例')

  18. @user12ize("param", [{"title": "标题1", "param": 2, "assert": 3}])

  19. def test_case_one(self, param):

  20. sleep(1)

  21. allure.dynamic.description("测试故事1-描述信息")

  22. allure.dynamic.severity(allure.severity_level.CRITICAL) # 用例级别严重 # allure动态标题 allure.dynamic.title(param["title"])

  23. log.info("测试faker数据")

  24. log.info(f"{get_name()} {get_phone_number()} {get_id_card()}")

  25. # pytest.assume(False) # 多重断言插件,断言失败继续执行下面 assert param["param"] + 2 == param["assert"]

  26. @allure.story("查询学生接口")

  27. @user14cess # 此用例分组到可多进程跑测 @user15ize("param", api_backend["get_student"]["param_list"])

  28. def test_get_student(self, param, fixture_get_token):

  29. sleep(1)

  30. allure.dynamic.title(param["title"])

  31. data_dict = api_backend["get_student"]

  32. data_dict["headers"]["Cookie"] = fixture_get_token

  33. res = parser("api_backend", data_dict, param["data"])

  34. assert_res(res, param["assert"])

  35. @allure.story("新增学生接口")

  36. @user17cess # 此用例分组到可多进程跑测 @user18ize("param", api_backend["post_student"]["param_list"])

  37. def test_post_student(self, param):

  38. sleep(1)

  39. allure.dynamic.title(param["title"])

  40. data_dict = api_backend["post_student"]

  41. res = parser("api_backend", data_dict, param["data"])

  42. assert_res(res, param["assert"])

  43. @allure.story("更新学生接口")

  44. @user20cess # 此用例分组到可多进程跑测 @user21ize("param", api_backend["put_student"]["param_list"])

  45. def test_put_student(self, param):

  46. sleep(1)

  47. allure.dynamic.title(param["title"])

  48. data_dict = api_backend["put_student"]

  49. res = parser("api_backend", data_dict, param["data"])

  50. assert_res(res, param["assert"])

用例数据驱动

 
  1. # line_of_business/service_name_api_backend/data.py

  2. # 服务名外层大字典,参数key是接口名,value是接口的请求信息字典,用例模块可通过接口名引用接口信息字典

  3. # param_list 参数化列表,用于pytest参数化,每次选取其中一项的data,去更新外部data通用参数,发起请求api_backend = {

  4. "get_student": dict(path="/student",

  5. method="get",

  6. # headers 不包含Content-Type 则request使用params传参 headers={},

  7. # 通用参数,每次请求使用 data={"test": "test"},

  8. # 参数化参数,每次使用其中一项,更新通用参数 param_list=[

  9. {"title": "获取学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},

  10. {"title": "获取学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},

  11. {"title": "获取学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}

  12. ]),

  13. 'post_student': dict(path="/student",

  14. method="post",

  15. # headers Content-Type = application/x-www-form-urlencoded 则使用 request使用data传参 headers={"Cookie": "", "Content-Type": "application/x-www-form-urlencoded"},

  16. data={"test": "test"},

  17. param_list=[

  18. {"title": "新增学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 1, "msg": "ok"}},

  19. {"title": "新增学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "ok"}},

  20. {"title": "新增学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}

  21. ]),

  22. 'put_student': dict(path="/student",

  23. method="put",

  24. # headers Content-Type = application/json 则使用 request使用json传参 headers={"Cookie": "", "Content-Type": "application/json"},

  25. data={"test": "test"},

  26. param_list=[

  27. {"title": "更新学生信息-张三", "data": {"name": "张三"}, "assert": {"code": 0, "msg": "ok"}},

  28. {"title": "更新学生信息-李四", "data": {"name": "李四"}, "assert": {"code": 0, "msg": "okk"}},

  29. {"title": "更新学生信息-王五", "data": {"name": "王五"}, "assert": {"code": 0, "msg": "ok"}}

  30. ])}

调试运行入口

 
  1. # run.pyimport pytestimport os# 用例调试入口if __name__ == '__main__':

  2. pytest.main([r"line_of_business_name", "--clean-alluredir", "--alluredir=allure_result", "--cache-clear", "--env=prod"])

  3. # pytest.main([r"-m multiprocess", "--clean-alluredir", "--alluredir=allure_result", "-n 3", "--cache-clear", "--env=prod"]) os.system(r"allure generate allure_result -c -o allure_report")

  4. os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")

失败用例重跑

 
  1. # failed_run.pyimport pytestimport os# 失败用例重跑if __name__ == '__main__':

  2. pytest.main([r"line_of_business_name", "--lf", "--clean-alluredir", "--alluredir=allure_result", "--env=prod"])

  3. os.system(r"allure generate allure_result -c -o allure_report")

  4. os.system(r"allure open -h 127.0.0.1 -p 8899 allure_report")

报告展示

 

感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!有需要的小伙伴可以点击下方小卡片领取   

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com