#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import functools import logging from typing import List, Union from flask import g, current_app, Flask from .globalconst import GlobalConst from .globalutility import Utility from .my_baseexception import MyBaseException from .my_stringutils import MyStringUtils def get_current_app() -> Flask: r''' 获取当前current_app Returns: ''' return current_app._get_current_object() # pylint: disable=protected-access def get_app_logger() -> logging.Logger: r''' 获取 current_app 的logger Returns: ''' return get_current_app().logger def gen_userinfo(global_obj=None, redis_info: dict = None, key_userinfo: str = GlobalConst.G_KEY_USERINFO): r''' 装饰器函数, 用于获取用户基本信息,角色信息,权限列表信息等 Args: global_obj: 存储用户信息的全局变量,默认为 Flask.g redis_info: 获取用户信息的 redis 信息 , 默认为使用环境变量读取指定配置文件的内容 key_userinfo: 存储用户信息时使用的key Returns: ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): try: # LONG_TODO 访问redis 获取用户信息 print(redis_info) dict_userinfo: dict = None setattr(global_obj, key_userinfo, dict_userinfo) except Exception: pass return func(*args, **kw) return wrapper return decorator def login_required(global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_usersystem_in_userinfo: str = GlobalConst.G_USERINFO_KEY_USER_SYSTEM, key_userid_in_userinfo: str = GlobalConst.G_USERINFO_KEY_USER_ID, expected_usersystem: str = None, retcode: str = GlobalConst.RETCODE_FOR_NOT_LOGIN, retval: str = GlobalConst.RETVAL_FOR_NOT_LOGIN): r''' 装饰器函数, 用户判断用户是否已经登陆 Args: global_obj: 获取用户信息时使用的全局变量,默认为 Flask.g key_userinfo: 获取用户信息时使用的key key_usersystem_in_userinfo: 获取用户所属系统时使用的key key_userid_in_userinfo: 获取用户id时使用的key expected_usersystem: 期望的用户所属系统,默认为None, 此时用户信息中的用户系统为 None 或 default 时即匹配 retcode_for_not_login: retval_for_not_login: Returns: ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): # 加入一个环境变量, 用于更灵活的控制 if Utility.get_env(GlobalConst.ENV_LOGIN_REQUIRED) == '1': # 从 g 中获取用户信息, 判断用户是否已经登陆 gg_used = global_obj or g g_userinfo: dict = getattr(gg_used, key_userinfo, None) if g_userinfo is None or not g_userinfo: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval) # 判断用户信息中的用户类型是否是期望的用户 default_usersystem = 'default' g_usersystem = g_userinfo.get(key_usersystem_in_userinfo, default_usersystem) m_expected_usersystem = expected_usersystem or default_usersystem if not m_expected_usersystem == g_usersystem: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg=Utility.join_str_with_none( '已登陆的用户所属系统(', g_usersystem, ')不是期望值(', m_expected_usersystem, ')')) # 获取用户id不为空,确认用户登陆 g_userid = g_userinfo.get(key_userid_in_userinfo, None) if MyStringUtils.is_empty(g_userid): return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg=Utility.join_str_with_none( '已登陆的用户id为空')) return func(*args, **kw) return wrapper return decorator def permission_required(permissions: Union[List[Union[List, str]], str], global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_permission_in_userinfo: str = GlobalConst.G_USERINFO_KEY_PERMISSION, retcode: str = GlobalConst.RETCODE_FOR_NO_PERMISSION, retval: str = GlobalConst.RETVAL_FOR_NO_PERMISSION): r''' 装饰器函数, 用户判断用户是否具备指定权限 Args: global_obj: 获取用户信息时使用的全局变量,默认为 Flask.g key_userinfo: 获取用户信息时使用的key key_permission_in_userinfo: 获取用户权限时使用的key permissions: 需要的权限或权限列表, 如果是字符串,则直接表示需要的权限,如果是列表,则表示列表中的任意权限满足即可,如果列表中的项还是列表(子列表),则子列表中的所有项都必须得到满足才认为该子项满足,同时该子项不能为空(为空认为无效) retcode_for_not_login: retval_for_not_login: Returns: ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): # 加入一个环境变量, 用于更灵活的控制 if Utility.get_env(GlobalConst.ENV_PERMISSION_REQUIRED) == '1': # 从 g 中获取用户信息, 判断用户是否是否具备指定权限 # 判断用户是否已经登陆 gg_used = global_obj or g g_userinfo: dict = getattr(gg_used, key_userinfo, None) if not g_userinfo: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg=GlobalConst.RETVAL_FOR_NOT_LOGIN) # 获取用户权限列表 g_permissions: tuple = g_userinfo.get(key_permission_in_userinfo, None) if not g_permissions: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg='用户权限列表的key不存在') bool_satisfied = False if isinstance(permissions, str): # 如果需求的权限是字符串 if permissions in g_permissions: bool_satisfied = True else: # 如果需求的权限不是字符串,则进行迭代遍历, 有一项满足即认为是满足 for tmp_required_content in permissions: if isinstance(tmp_required_content, str): # 如果迭代项是字符串, 有一项满足,即认为满足 if tmp_required_content in g_permissions: bool_satisfied = True break else: # 如果迭代项不是字符串, 则进行迭代遍历, 此时必须满足所有项才认为是满足 # 同时要求的权限不能为空 if tmp_required_content: tmp_bool_satified = True for tmp_tmp_required_content in tmp_required_content: if not tmp_tmp_required_content in g_permissions: tmp_bool_satified = False break if tmp_bool_satified: bool_satisfied = True break if not bool_satisfied: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg=Utility.join_str('用户缺少权限', permissions)) return func(*args, **kw) return wrapper return decorator def role_required(roleids: Union[List[str], str], global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_role_in_userinfo: str = GlobalConst.G_USERINFO_KEY_USERROLE, retcode: str = GlobalConst.RETCODE_FOR_MISS_ROLE, retval: str = GlobalConst.RETVAL_FOR_MISS_ROLE): r''' 装饰器函数, 用户判断用户是否具备指定角色 Args: global_obj: 获取用户信息时使用的全局变量,默认为 Flask.g key_userinfo: 获取用户信息时使用的key key_role_in_userinfo: 获取用户权限时使用的key roleids: 需要的角色或角色列表, 如果是字符串,则直接表示需要的权限,如果是列表,则表示列表中的任意权限满足即可,如果列表中的项还是列表(子列表),则子列表中的所有项都必须得到满足该子项才算作是满足,同时该子项不能为空(为空认为无效) retcode_for_not_login: retval_for_not_login: Returns: ''' def decorator(func): @functools.wraps(func) def wrapper(*args, **kw): # 加入一个环境变量, 用于更灵活的控制 if Utility.get_env(GlobalConst.ENV_ROLE_REQUIRED) == '1': # 从 g 中获取用户信息, 判断用户是否是否具备指定角色 # 判断用户是否已经登陆 gg_used = global_obj or g g_userinfo: dict = getattr(gg_used, key_userinfo, None) if not g_userinfo: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg=GlobalConst.RETVAL_FOR_NOT_LOGIN) # 获取用户权限列表 g_roles: tuple = g_userinfo.get(key_role_in_userinfo, None) if not g_roles: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg='用户角色列表的key不存在') bool_satisfied = False if isinstance(roleids, str): # 如果需求的权限是字符串 if roleids in g_roles: bool_satisfied = True else: # 如果需求的权限不是字符串,则进行迭代遍历, 有一项满足即认为是满足 for tmp_required_content in roleids: if isinstance(tmp_required_content, str): # 如果迭代项是字符串, 有一项满足,即认为满足 if tmp_required_content in g_roles: bool_satisfied = True break else: # 如果迭代项不是字符串, 则进行迭代遍历, 此时必须满足所有项才认为是满足 # 同时要求的权限不能为空 if tmp_required_content: tmp_bool_satified = True for tmp_tmp_required_content in tmp_required_content: if not tmp_tmp_required_content in g_roles: tmp_bool_satified = False break if tmp_bool_satified: bool_satisfied = True break if not bool_satisfied: return MyBaseException.format_to_standard_dict(ret_code=retcode, ret_val=retval, submitted_msg=Utility.join_str('用户缺少角色', roleids)) return func(*args, **kw) return wrapper return decorator