草庐IT

python 之 session鉴权的处理

奈非天 2023-05-31 原文

一、session鉴权的处理

1. requests的会话对象

就像一个浏览器一样,它会在同一个会话中自动处理cookie信息,不需要写任何额外的代码。

import requests
 
 
session = requests.Session()  # 理解为就是一个浏览器
 
 
type(session)
 
 
requests.sessions.Session
 
 
session.post()# 登录
session.get() # 获取某个数据,会自动携带上一步收到的cookie
 
 
# 课堂派案例
headers = {'cookie': 'FZ_STROAGE.ketangpai.com=eyJTRUVTSU9OSUQiOiIzMTI5MjRiNTU2MzNmMDUxIiwiU0VFU0lPTkRBVEUiOjE2Mzk1NzA0NDQ3Njd9; ARK_ID=undefined; ketangpai_home_slb=3fbda3fc94d5d1be63720d9c156288d0; PHPSESSID=kmugv5id4lcecie33asikt3p96; ketangpai_home_remember=think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKiGl7G2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6eDl36KYW0%22%2C%22sign%22%3A%2207f1bd0c97817e6d7ebe92bfe8e30fe9%22%7D',
          'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.61 Safari/537.36'}
res = requests.get(url='https://v4.ketangpai.com/UserApi/getUserInfo')
 
 
res.status_code
 
 
200
 
 
res.cookies
 
 
<RequestsCookieJar[Cookie(version=0, name='PHPSESSID', value='krm5vua2f6f07m5rjipa0uti16', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'HttpOnly': None}, rfc2109=False), Cookie(version=0, name='ketangpai_home_slb', value='3fbda3fc94d5d1be63720d9c156288d0', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'httponly': None}, rfc2109=False)]>
 
 
res.json()
 
 
{'status': 0, 'info': '您还未登陆!'}
 
 
session = requests.Session() # 1. 创建一个session对象
 
 
# 2. 登录
login_url = 'https://v4.ketangpai.com/UserApi/login'
data = {'email': '877***9301@qq.com',
        'password': 'Pyt***inlan',
        'remember': 0}
# json data params
response = session.post(url=login_url, data=data)
 
 
session.cookies
 
 
<RequestsCookieJar[Cookie(version=0, name='PHPSESSID', value='5if12vo96vtulhfhr9bvu1nnr2', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'HttpOnly': None}, rfc2109=False), Cookie(version=0, name='ketangpai_home_remember', value='think%3A%7B%22username%22%3A%22MDAwMDAwMDAwMLV2x5eHz7dthN523LWtftmC0IDak4NubQ%22%2C%22expire%22%3A%22MDAwMDAwMDAwMLOGvd6IubtrhKigl7O2dZ4%22%2C%22token%22%3A%22MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-q6iZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug6edl4CKYW0%22%2C%22sign%22%3A%2298880a4b0ee67193316c6c40dd40441f%22%7D', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=1639581779, discard=False, comment=None, comment_url=None, rest={'httponly': None}, rfc2109=False), Cookie(version=0, name='ketangpai_home_slb', value='3fbda3fc94d5d1be63720d9c156288d0', port=None, port_specified=False, domain='v4.ketangpai.com', domain_specified=False, domain_initial_dot=False, path='/', path_specified=True, secure=True, expires=None, discard=True, comment=None, comment_url=None, rest={'httponly': None}, rfc2109=False)]>
 
 
res = session.get(url='https://v4.ketangpai.com/UserApi/getUserInfo')
 
 
res.json()
 
 
{'status': 1,
 'data': {'id': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ',
  'username': '****',
  'avatar': 'http://v4.ketangpai.com/Public/Common/img/40/26.png',
  'usertype': '1',
  'email': '877***01@qq.com',
  'stno': '',
  'atteststate': 0,
  'attestInfo': []}}
 
 
# 如果不用session对象,每一步都需要自己处理cookie
 
 
login_url = 'https://v4.ketangpai.com/UserApi/login'
data = {'email': 877***9301@qq.com',
        'password': 'Pyt***inlan',
        'remember': 0}
# 1.登录
response = requests.post(url=login_url, data=data)
 
 
response.status_code
 
 
200
 
 
response.json()
 
 
{'status': 1,
 'url': '/Main/index.html',
 'token': 'MDAwMDAwMDAwMMurrpWavLehhs1-lbO5hZWEzYfcepuomcWmmqaMiHtnr5ypzYWosKKZq6HQxtOK0ZCme5p-haiZu2yrn4uNhJ3KedDYk7ivboS4jt6zuY2Ug7d33n96YW0',
 'isenterprise': 0,
 'uid': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ'}
 
 
# 2.获取数据
res = requests.get(url='https://v4.ketangpai.com/UserApi/getUserInfo', cookies=response.cookies)
 
 
res.json()
 
 
{'status': 1,
 'data': {'id': 'MDAwMDAwMDAwMLSGz96Iqb9phLVyoQ',
  'username': '****',
  'avatar': 'http://v4.ketangpai.com/Public/Common/img/40/26.png',
  'usertype': '1',
  'email': '877***01@qq.com',
  'stno': '',
  'atteststate': 0,
  'attestInfo': []}}
 
 

requests库的session对象仅仅只是自动帮我们处理了cookie的携带问题。

2. 封装处理session鉴权的http请求函数

思路:

  • 保证在一个会话中使用同一个会话对象即可
  • 给每一个用例类创建一个会话对象即可。

import json
import unittest
from jsonpath import jsonpath
import setting
from common import logger, db
from common.data_handler import (
    replace_args_by_re,
    generate_no_usr_phone)
from common.encrypt_handler import generate_sign
import requests


class BaseCase(unittest.TestCase):
    """
    用例基类
    """
    db = db
    logger = logger
    setting = setting
    name = 'base_case'
    session = requests.session()  # 创建一个session对象用来处理session鉴权

    @classmethod
    def setUpClass(cls) -> None:
        cls.logger.info('==========={}接口开始测试==========='.format(cls.name))

    @classmethod
    def tearDownClass(cls) -> None:
        cls.logger.info('==========={}接口结束测试==========='.format(cls.name))

    def flow(self, item):
        """
        测试流程
        """
        self.logger.info('>>>>>>>用例{}开始执行>>>>>>>>'.format(item['title']))
        # 把测试数据绑定到方法属性case上,其他也要把一些变量绑定到对象的属性上
        self._case = item
        # 1. 处理测试数据
        self.process_test()
        # 2. 发送请求
        self.send_request()
        # 3. 断言
        self.assert_all()

    def process_test(self):
        """
        测试数据的处理
        """
        # 1.1 生成测试数据
        self.generate_test_data()
        # 1.2 替换依赖参数
        self.replace_args()
        # 1.3 处理url
        self.process_url()
        # 1.4 鉴权处理
        self.auth_process()

    def auth_process(self):
        """
        v3版本鉴权处理
        :return:
        """
        request_data = self._case.get('request_data')
        if request_data:
            headers = request_data.get('headers')
            if headers:
                if headers.get('X-Lemonban-Media-Type') == 'lemonban.v3':
                    # 获取token
                    token = self._case['request_data']['headers']['Authorization'].split(' ')[-1]
                    # 生成签名
                    sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)
                    # 添加到请求数据中
                    self._case['request_data']['json']['sign'] = sign
                    self._case['request_data']['json']['timestamp'] = timestamp

        # if self._case['request_data']['headers']['X-Lemonban-Media-Type'] == 'lemonban.v3':
        #     # 获取token
        #     token = self._case['request_data']['headers']['Authorization'].split(' ')[-1]
        #     # 生成签名
        #     sign, timestamp = generate_sign(token, self.setting.SERVER_RSA_PUB_KEY)
        #     # 添加到请求数据中
        #     self._case['request_data']['json']['sign'] = sign
        #     self._case['request_data']['json']['timestamp'] = timestamp

    def generate_test_data(self):
        """
        生成测试数据
        """
        """
           生成测试数据,不是固定流程,有不同可以复写
           :return:
           """
        self._case = json.dumps(self._case)
        if '$phone_number$' in self._case:
            phone = generate_no_usr_phone()
            self._case = self._case.replace('$phone_number$', phone)
        self._case = json.loads(self._case)

    def replace_args(self):
        """
        替换参数
        """
        self._case = json.dumps(self._case)  # 把用例数据dumps成字符串,一次替换
        self._case = replace_args_by_re(self._case, self)
        self._case = json.loads(self._case)
        # 再将request_data, expect_data loads为字典
        try:
            self._case['request_data'] = json.loads(self._case['request_data'])
            self._case['expect_data'] = json.loads(self._case['expect_data'])
        except Exception as e:
            self.logger.error('{}用例的测试数据格式不正确'.format(self._case['title']))
            raise e

    def process_url(self):
        """
        处理url
        """

        if self._case['url'].startswith('http'):
            # 是否是全地址
            pass
        elif self._case['url'].startswith('/'):
            # 是否是短地址
            self._case['url'] = self.setting.PROJECT_HOST + self._case['url']
        else:
            # 接口名称
            try:
                self._case['url'] = self.setting.INTERFACES[self._case['url']]
            except Exception as e:
                self.logger.error('接口名称:{}不存在'.format(self._case['url']))
                raise e

    def send_request(self):
        """
        发送请求
        @return:
        """
        self._response = self.session.request(
            url=self._case['url'], method=self._case['method'], **self._case['request_data'])
        # self._response = send_http_request(url=self._case['url'], method=self._case['method'],
        #                                    **self._case['request_data'])

    def assert_all(self):
        try:
            # 3.1 断言响应状态码
            self.assert_status_code()
            # 3.2 断言响应数据
            self.assert_response()
            # 响应结果断言成功后就提取依赖数据
            self.extract_data()
            # 3.3 数据库断言后面的任务
            self.assert_sql()
        except  Exception as e:
            self.logger.error('++++++用例{}测试失败'.format(self._case['title']))
            raise e
        else:
            self.logger.info('<<<<<<<<<用例{}测试成功<<<<<<<'.format(self._case['title']))

    def assert_status_code(self):
        """
        断言响应状态码
        """
        try:
            self.assertEqual(self._case['status_code'], self._response.status_code)
        except AssertionError as e:
            self.logger.warning('用例【{}】响应状态码断言异常'.format(self._case['title']))
            raise e
        else:
            self.logger.info('用例【{}】响应状态码断言成功'.format(self._case['title']))

    def assert_response(self):
        """
        断言响应数据
        """
        if self._case['res_type'].lower() == 'json':
            res = self._response.json()
        elif self._case['res_type'].lower() == 'html':
            # 扩展思路
            res = self._response.text
        try:
            self.assertEqual(self._case['expect_data'], {'code': res['code'], 'msg': res['msg']})
        except AssertionError as e:
            self.logger.warning('用例【{}】响应数据断言异常'.format(self._case['title']))
            self.logger.warning('用例【{}】期望结果为:{}'.format(self._case['title'], self._case['expect_data']))
            self.logger.warning('用例【{}】的响应结果:{}'.format(self._case['title'], res))
            raise e
        else:
            self.logger.info('用例【{}】响应数据断言成功'.format(self._case['title']))

    def assert_sql(self):
        """
        断言数据库
        """
        if self._case.get('sql'):  # 返回指定键的值,如果键不在字典中返回默认值 None 或者设置的默认值。
            # 只有sql字段有sql的才需要校验数据库
            # 只有sql字段有sql的才需要校验数据库
            sqls = self._case['sql'].split(',')
            for sql in sqls:
                try:
                    self.assertTrue(self.db.exist(sql))
                except AssertionError as e:
                    self.logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(self._case['title'], sql))
                    raise e
                except Exception as e:
                    self.logger.warning('用例【{}】数据库断言异常,执行的sql为:{}'.format(self._case['title'], sql))
                    raise e

    def extract_data(self):
        """
        根据提取表达式提取对应的数据
        :return:
        """
        if self._case.get('extract'):
            if self._case['res_type'].lower() == 'json':
                self.extract_data_from_json()
            elif self._case['res_type'].lower() == 'html':
                self.extract_data_from_html()
            elif self._case['res_type'].lower() == 'xml':
                self.extract_data_from_xml()
            else:
                raise ValueError('res_type类型不正确,只支持json,html,xml')

    def extract_data_from_json(self):
        """
        从json数据中提取数据并绑定到类属性中
        :return:
        """
        try:
            rules = json.loads(self._case.get('extract'))
        except Exception as e:
            raise ValueError('用例【{}】的extract字段数据:{}格式不正确'.format(self._case['title'], self._case['extract']))
        for rule in rules:
            # 类属性名
            name = rule[0]
            # 提取表达式
            exp = rule[1]
            # 根据jsonpath去响应中提取值
            value = jsonpath(self._response.json(), exp)
            # 如果能提取到值
            if value:
                # 把值绑定到对应的类属性上
                setattr(self.__class__, name, value[0])  # 注意value是个列表
            else:
                # 提取不到值,说明jsonpath写错了,或者是响应又问题
                raise ValueError('用例【{}】的提取表达式{}提取不到数据'.format(self._case['title'], self._case['extract']))

    def extract_data_from_html(self):
        """
        从html数据中提取数据并绑定到类属性中
        :return:
        """
        raise ValueError('请实现此方法')

    def extract_data_from_xml(self):
        """
        从xml数据中提取数据并绑定到类属性中
        :return:
        """
        raise ValueError('请实现此方法')
 
 

from unittestreport import ddt, list_data
from common.base_case import BaseCase

cases = [
    {'title': '课堂派登录',
     'method': 'post',
     'url': 'https://v4.ketangpai.com/UserApi/login',
     'request_data': '{"data": {"email": "877***01@qq.com", "password": "Pyt***inlan", "remember": 0}}',
     'status_code': 200,
     'res_type': 'json',
     'expect_data': '{"status": 1}'
     },
    {
        'title': '获取用户信息',
        'method': 'get',
        'url': 'https://v4.ketangpai.com/UserApi/getUserInfo',
        'request_data': '{}',
        'status_code': 200,
        'res_type': 'json',
        'expect_data': '{"status": 1}'
    }
]


@ddt
class TestCourseFlow(BaseCase):
    name = "课堂派测试"

    @list_data(cases)
    def test_course(self, item):
        self.flow(item)

    def assert_response(self):
        """
        复写assert_response方法实现课堂派的断言
        :return:
        """
        if self._case['res_type'].lower() == 'json':
            res = self._response.json()
        elif self._case['res_type'].lower() == 'html':
            # 扩展思路
            res = self._response.text
        try:
            self.assertEqual(self._case['expect_data'], {'status': res['status']})
        except AssertionError as e:
            self.logger.warning('用例【{}】响应数据断言异常'.format(self._case['title']))
            self.logger.warning('用例【{}】期望结果为:{}'.format(self._case['title'], self._case['expect_data']))
            self.logger.warning('用例【{}】的响应结果:{}'.format(self._case['title'], res))
            raise e
        else:
            self.logger.info('用例【{}】响应数据断言成功'.format(self._case['title']))


if __name__ == '__main__':
    BaseCase.unittest.main()

 

有关python 之 session鉴权的处理的更多相关文章

  1. python - 如何使用 Ruby 或 Python 创建一系列高音调和低音调的蜂鸣声? - 2

    关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。

  2. ruby - 如何指定 Rack 处理程序 - 2

    Rackup通过Rack的默认处理程序成功运行任何Rack应用程序。例如:classRackAppdefcall(environment)['200',{'Content-Type'=>'text/html'},["Helloworld"]]endendrunRackApp.new但是当最后一行更改为使用Rack的内置CGI处理程序时,rackup给出“NoMethodErrorat/undefinedmethod`call'fornil:NilClass”:Rack::Handler::CGI.runRackApp.newRack的其他内置处理程序也提出了同样的反对意见。例如Rack

  3. Python 相当于 Perl/Ruby ||= - 2

    这个问题在这里已经有了答案:关闭10年前。PossibleDuplicate:Pythonconditionalassignmentoperator对于这样一个简单的问题表示歉意,但是谷歌搜索||=并不是很有帮助;)Python中是否有与Ruby和Perl中的||=语句等效的语句?例如:foo="hey"foo||="what"#assignfooifit'sundefined#fooisstill"hey"bar||="yeah"#baris"yeah"另外,类似这样的东西的通用术语是什么?条件分配是我的第一个猜测,但Wikipediapage跟我想的不太一样。

  4. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  5. 华为OD机试用Python实现 -【明明的随机数】 2023Q1A - 2

    华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o

  6. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  7. 「Python|Selenium|场景案例」如何定位iframe中的元素? - 2

    本文主要介绍在使用Selenium进行自动化测试或者任务时,对于使用了iframe的页面,如何定位iframe中的元素文章目录场景描述解决方案具体代码场景描述当我们在使用Selenium进行自动化测试的时候,可能会遇到一些界面或者窗体是使用HTML的iframe标签进行承载的。对于iframe中的标签,如果直接查找是无法找到的,会抛出没有找到元素的异常。比如近在咫尺的例子就是,CSDN的登录窗体就是使用的iframe,大家可以尝试通过F12开发者模式查看到的tag_name,class_name,id或者xpath来定位中的页面元素,会抛出NoSuchElementException异常。解决

  8. python ffmpeg 使用 pyav 转换 一组图像 到 视频 - 2

    2022/8/4更新支持加入水印水印必须包含透明图像,并且水印图像大小要等于原图像的大小pythonconvert_image_to_video.py-f30-mwatermark.pngim_dirout.mkv2022/6/21更新让命令行参数更加易用新的命令行使用方法pythonconvert_image_to_video.py-f30im_dirout.mkvFFMPEG命令行转换一组JPG图像到视频时,是将这组图像视为MJPG流。我需要转换一组PNG图像到视频,FFMPEG就不认了。pyav内置了ffmpeg库,不需要系统带有ffmpeg工具因此我使用ffmpeg的python包装p

  9. Python 刷Leetcode题库,顺带学英语单词(31) - 2

    ValidPalindromeGivenastring,determineifitisapalindrome,consideringonlyalphanumericcharactersandignoringcases. [#125]Example:"Aman,aplan,acanal:Panama"isapalindrome."raceacar"isnotapalindrome.Haveyouconsiderthatthestringmightbeempty?Thisisagoodquestiontoaskduringaninterview.Forthepurposeofthisproblem

  10. ruby-on-rails - Rails 优雅地处理超时 session ? - 2

    使用rails4,ruby2。我在rails配置中为我的cookiesession设置了30分钟的超时时间。问题是,如果我转到表单,让session超时,然后提交表单,我会收到此ActionController::InvalidAuthenticityToken错误。如何在Rails中优雅地处理这个错误?比如说,重定向到登录屏幕? 最佳答案 在您的ApplicationController:rescue_fromActionController::InvalidAuthenticityTokendoredirect_tosome_p

随机推荐