#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ from typing import Union from flask import request, Response, make_response, g from .globalconst import GlobalReqKeyConst, GlobalConst, UserInfoKeyConst, EnvNameConst from .globalutility import Utility from .httptrans import HttpTrans from .my_baseexception import create_base_exception_with_dict, create_base_exception from .my_stringutils import MyStringUtils from .my_utils import MyUtils class ServiceHandle: r''' 服务访问便捷类 ''' _KEY_REQ_DICT_IN_REQUEST: str = 'req_dict' @classmethod def get_info_with_fromenv_first(cls, info_obj: Union[dict, str], info_key: str): r''' 如果 环境变量 LOCAL_RUN = 1 , 则优先获取 info_key 的环境变量对应的 内容 , 否则获取 info_obj 中该key对应的 内容 或 info_obj 如果是字符串,则直接获取该字符串 Args: info_obj: 要获取的内容或字典 info_key: 要获取的内容或字典中该内容对应的key,也是环境变量名称 Returns: ''' # 如果提供了 info_key 且 LOCAL_RUN==1, # 则优先获取 info_key 的环境变量对应的 内容 , if not MyStringUtils.is_empty(info_key) \ and \ MyStringUtils.equals(Utility.get_env(EnvNameConst.ENV_LOCAL_RUN), '1'): str_in_env = Utility.get_env(info_key) if str_in_env: return str_in_env if isinstance(info_obj, str): return info_obj return info_obj.get(info_key) @classmethod def get_mapwebarg(cls) -> dict: r''' 获取request中的mapWebArg,用于在处理request的过程中更改请求参数, 获取原始的请求参数请使用 request.values.to_dict Returns: ''' if not hasattr(request, cls._KEY_REQ_DICT_IN_REQUEST): a_dict = request.values.to_dict() setattr(request, cls._KEY_REQ_DICT_IN_REQUEST, a_dict) return a_dict else: return getattr(request, cls._KEY_REQ_DICT_IN_REQUEST, None) @classmethod def check_is_success(cls, ret_dict: dict, expect_retcode: str = GlobalConst.RETCODE_SUCESS_CODE) -> bool: r''' 快速判断标准格式的json的retcode是否为1 Args: ret_dict: Returns: ''' # 当 ret_dict 不为 None 且其 RETCODE为期望值时返回True return ret_dict is not None and MyStringUtils.equals(expect_retcode, MyStringUtils.to_str( ret_dict.get(GlobalConst.RETKEY_RET_CODE))) @classmethod def get_error_info(cls, ret_dict: dict, expect_retval_key: str = GlobalConst.RETKEY_RET_VAL) -> bool: r""" 获取错误信息, Args: ret_dict: 返回值 expect_retval_key: 期望获取的返回错误的key Returns: """ # 当 ret_dict 不为 None 且其 RETCODE为期望值时返回True return ret_dict is not None and expect_retval_key in ret_dict.keys() and ret_dict[expect_retval_key] @classmethod def querydb(cls, service_url: str, query_str: str, dict_header: dict = None) -> dict: r''' 查询数据库,封装的mysqlpool sql=select xxx 接口 Args: service_url: query_str:select * from tbname dict_header: Returns: ''' # 生成 trans-data trans_data = { GlobalReqKeyConst.SQL: MyStringUtils.to_str(query_str) } # 访问 Url try: headers = request.headers except: headers = None http_trans = HttpTrans(headers) return Utility.jsonstr2dict( http_trans.post_encodedata(service_url, trans_data=trans_data, dict_header=dict_header)) @classmethod def querycount_db(cls, service_url: str, query_str: str, dict_header: dict = None) -> int: r''' 查询数据库,执行count数据库查询,直接得到返回的整数 Args: service_url: query_str: Returns: ''' ret_dict = cls.querydb(service_url, query_str, dict_header=dict_header) if not cls.check_is_success(ret_dict): raise create_base_exception_with_dict(ret_dict) ret_dict = cls.safe_get_first_dict_from_list(cls.safe_get_datarows(ret_dict)) if ret_dict is None: raise create_base_exception('数据库查询语句返回结果集为空') return int(list(ret_dict.values())[0]) @classmethod def operate_db(cls, service_url: str, operate_sql_list: list, dict_header: dict = None) -> dict: r''' 操作数据库,mysqlpool trans,规范参考word Args: service_url: operate_sql_list: 要执行的sql语句列表 Returns: ''' # 生成 trans-data operate_sql_list_wrapper = [{GlobalReqKeyConst.TRANSSQL: tmp_sql_str} for tmp_sql_str in operate_sql_list] trans_data = { GlobalReqKeyConst.TRANSJSONARRAY: operate_sql_list_wrapper } # 访问 Url try: headers = request.headers except: headers = None http_trans = HttpTrans(headers) return Utility.jsonstr2dict( http_trans.post_encodedata(service_url, trans_data=trans_data, dict_header=dict_header)) @classmethod def call_proc(cls, service_url: str, proc_name: str, arg_dict: dict, dict_header: dict = None) -> dict: r''' 执行存储过程 mysqlpool proc web 接口 Args: service_url: proc_name: 要执行的sql语句列表 Returns: ''' # 生成 trans-data if not arg_dict: arg_dict = {} arg_dict[GlobalReqKeyConst.PROCEDURENAME] = proc_name # 访问 Url try: headers = request.headers except: headers = None http_trans = HttpTrans(headers) return Utility.jsonstr2dict( http_trans.post_encodedata(service_url, trans_data=arg_dict, dict_header=dict_header)) @classmethod def safe_get_first_dict_from_list(cls, a_list: list) -> dict: r''' 尝试从 list 中获取第一个 dict 对象,获取失败则返回 None Args: a_list: Returns: ''' try: return a_list[0] except: return None @classmethod def safe_get_list_from_dict(cls, a_dict: dict, key_name: str) -> list: r''' 尝试从 dict 获取指定key表示的 list ,出错则返回一个空的 list, 这里的safe是指 key 不存在时不出错,但是key对应的对象或字符串不是 list 时仍然会出错 Args: a_dict: key: Returns: ''' if a_dict is None or key_name is None: return [] tmp_obj = a_dict.get(key_name, None) if tmp_obj is None: return [] else: if isinstance(tmp_obj, list): return tmp_obj else: try: return list(tmp_obj) except: raise ValueError('获取{0}失败,{1}中的{0}不是list,且不能转成list'.format(key_name, a_dict)) @classmethod def safe_get_datarows(cls, a_dict: dict, key_name: str = GlobalConst.RETKEY_DATAROWS) -> list: r''' 尝试从 dict 获取指定 DataRows 表示的 list ,出错则返回一个空的 list, 这里的safe是指 key 不存在时不出错,但是key对应的对象或字符串不是 list 时仍然会出错 Args: a_dict: Returns: ''' return cls.safe_get_list_from_dict(a_dict=a_dict, key_name=key_name) @classmethod def do_make_response(cls, retinfo: Union[str, dict]) -> Response: r''' 将 retinfo转成字符串作为返回结果, 尤其是 retinfo为字典时会将其转换为 json字符串 Args: retinfo: Returns: ''' data_str = None if retinfo is None: data_str = MyStringUtils.EMPTY elif isinstance(retinfo, str): data_str = retinfo elif isinstance(retinfo, dict): data_str = MyUtils.dict2jsonstr(retinfo) else: try: # 尝试转成dict data_str = MyUtils.dict2jsonstr(dict(retinfo)) except: data_str = MyStringUtils.to_str(retinfo) response: Response = make_response(data_str, 200) response.content_type = GlobalConst.DEFAULTCONFIG_CONTENTTYPE response.content_encoding = GlobalConst.DEFAULTCONFIG_CHARACTERENCODING return response @classmethod def get_othermsg_from_exception(cls, exception: Exception) -> str: r''' 如果e是MyBaseException,则获取e.getSubmittedWebArg(),否则返回StringUtils.EMPTY Args: exception: Returns: ''' if MyUtils.has_attr(exception, 'submitted_webarg'): return MyStringUtils.to_str(getattr(exception, 'submitted_webarg', None)) else: return MyStringUtils.EMPTY @classmethod def put_userinfo_to_dict(cls, arg_dict: dict, self_serviceurl: str = None, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_userid: str = UserInfoKeyConst.USER_ID, key_username: str = UserInfoKeyConst.USER_NAME, key_tenantid: str = UserInfoKeyConst.TENANT_ID, key_deptid: str = UserInfoKeyConst.DEPT_ID, key_deptshortname: str = UserInfoKeyConst.DEPT_NAME_SHORT, key_deptfullname: str = UserInfoKeyConst.DEPT_NAME_FULL, key_ouid: str = UserInfoKeyConst.OU_ID, key_ouname: str = UserInfoKeyConst.OU_NAME, key_email: str = UserInfoKeyConst.EMAIL) -> dict: r''' 将 g.userinfo 中的用户信息加入到 map 中, 必须保证 g.userinfo 有值 map一般就是mapWebArg, 加入的信息为 arg_myuserid arg_myusername arg_mytenantid arg_mydeptid arg_mydeptname arg_myouid arg_req_userid arg_req_tenantid 然后就是将要要鉴权的服务url添加进去, arg_req_serviceurl 是否覆盖前端传递的内容,由环境变量 DontOverrideUserInfoFromFront 决定 Args: arg_dict: self_serviceurl: Returns: ''' # 增加一个环境变量用于判断是否不覆盖前端传递的用户信息 # 默认为 不 不覆盖,即 覆盖 default_for_dont_override_userinfo = '0' dont_override_userinfo = MyUtils.get_env( EnvNameConst.ENV_DONT_OVERRITE_USERINFO_FROM_FRANT) or default_for_dont_override_userinfo if not MyStringUtils.equals(dont_override_userinfo, '1'): # 如果不是 不覆盖,则覆盖 userid = cls.get_userid(global_obj=global_obj, key_userinfo=key_userinfo, key_userid=key_userid) tenantid = cls.get_usertenantid(global_obj=global_obj, key_userinfo=key_userinfo, key_tenantid=key_tenantid) arg_dict['arg_myuserid'] = userid arg_dict['arg_myusername'] = cls.get_username(global_obj=global_obj, key_userinfo=key_userinfo, key_username=key_username) arg_dict['arg_mydeptid'] = cls.get_userdeptid(global_obj=global_obj, key_userinfo=key_userinfo, key_deptid=key_deptid) arg_dict['arg_mydeptname'] = cls.get_userdept_shortname(global_obj=global_obj, key_userinfo=key_userinfo, key_deptshortname=key_deptshortname) arg_dict['arg_mydeptname_full'] = cls.get_userdept_fullname(global_obj=global_obj, key_userinfo=key_userinfo, key_deptfullname=key_deptfullname) arg_dict['arg_myouid'] = cls.get_user_ouid(global_obj=global_obj, key_userinfo=key_userinfo, key_ouid=key_ouid) arg_dict['arg_myouname'] = cls.get_user_ouname(global_obj=global_obj, key_userinfo=key_userinfo, key_ouname=key_ouname) arg_dict['arg_mytenantid'] = tenantid arg_dict['arg_myemail'] = cls.get_user_email(global_obj=global_obj, key_userinfo=key_userinfo, key_email=key_email) arg_dict['arg_req_serviceurl'] = self_serviceurl arg_dict['arg_req_userid'] = userid arg_dict['arg_req_tenantid'] = tenantid else: # 如果 是 不覆盖,则不覆盖 userid = cls.get_userid(global_obj=global_obj, key_userinfo=key_userinfo, key_userid=key_userid) tenantid = cls.get_usertenantid(global_obj=global_obj, key_userinfo=key_userinfo, key_tenantid=key_tenantid) arg_dict.setdefault('arg_myuserid', userid) arg_dict.setdefault('arg_myusername', cls.get_username(global_obj=global_obj, key_userinfo=key_userinfo, key_username=key_username)) arg_dict.setdefault('arg_mydeptid', cls.get_userdeptid(global_obj=global_obj, key_userinfo=key_userinfo, key_deptid=key_deptid)) arg_dict.setdefault('arg_mydeptname', cls.get_userdept_shortname(global_obj=global_obj, key_userinfo=key_userinfo, key_deptshortname=key_deptshortname)) arg_dict.setdefault('arg_mydeptname_full', cls.get_userdept_fullname(global_obj=global_obj, key_userinfo=key_userinfo, key_deptfullname=key_deptfullname)) arg_dict.setdefault('arg_myouid', cls.get_user_ouid(global_obj=global_obj, key_userinfo=key_userinfo, key_ouid=key_ouid)) arg_dict.setdefault('arg_myouname', cls.get_user_ouname(global_obj=global_obj, key_userinfo=key_userinfo, key_ouname=key_ouname)) arg_dict.setdefault('arg_mytenantid', tenantid) arg_dict.setdefault('arg_myemail', cls.get_user_email(global_obj=global_obj, key_userinfo=key_userinfo, key_email=key_email)) arg_dict.setdefault('arg_req_serviceurl', self_serviceurl) arg_dict.setdefault('arg_req_userid', userid) arg_dict.setdefault('arg_req_tenantid', tenantid) return arg_dict @classmethod def get_userinfo(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO) -> dict: r''' 从全局缓存中获取用户信息 Args: global_obj: key_userinfo: Returns: ''' gg_used = global_obj or g g_userinfo: dict = getattr(gg_used, key_userinfo, None) if g_userinfo is None or not g_userinfo: return None else: return dict(g_userinfo) @classmethod def get_key_from_userinfo(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key: str = None, default: str = MyStringUtils.EMPTY) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 key 对应的内容 , 不会返回None,只会返回 EMPTY Args: global_obj: key: default: Returns: ''' userinfo_dict = cls.get_userinfo(global_obj=global_obj, key_userinfo=key_userinfo) if userinfo_dict: return userinfo_dict.get(MyStringUtils.to_str(key)) or default else: return default @classmethod def get_usersystem(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_usersystem: str = UserInfoKeyConst.USER_SYSTEM, default_usersystem: str = None) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 usersystem , 不会返回None,如果key不存在或dict不存在,默认返回 default Args: global_obj: key_userinfo: key_usersystem: Returns: ''' default_usersystem = default_usersystem or 'default' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_usersystem, default=default_usersystem) @classmethod def get_userid(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_userid: str = UserInfoKeyConst.USER_ID) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 userid , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_userid: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_userid, default=MyStringUtils.EMPTY) @classmethod def is_login(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_userid: str = UserInfoKeyConst.USER_ID, key_usersystem: str = UserInfoKeyConst.USER_SYSTEM, default_usersystem: str = None, expect_usersystem: str = None) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 userid 和 usersystem, 判断用户的 usersystem 是否和预期的 usersystem 相同,并且有 登陆的userid Args: global_obj: key_userinfo: key_userid: key_usersystem: default_usersystem: 默认为 default expect_usersystem: 默认为 default Returns: ''' default_usersystem = default_usersystem or 'default' expect_usersystem = expect_usersystem or 'default' userid = cls.get_userid(global_obj=global_obj, key_userinfo=key_userinfo, key_userid=key_userid) usersystem = cls.get_usersystem(global_obj=global_obj, key_userinfo=key_userinfo, key_usersystem=key_usersystem, default_usersystem=default_usersystem) return (not MyStringUtils.is_empty(userid)) and MyStringUtils.equals(usersystem, expect_usersystem) @classmethod def get_username(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_username: str = UserInfoKeyConst.USER_NAME) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 userid , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_username: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_username, default=MyStringUtils.EMPTY) @classmethod def get_usertenantid(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_tenantid: str = UserInfoKeyConst.TENANT_ID) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 tenantid , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_tenantid: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_tenantid, default=MyStringUtils.EMPTY) @classmethod def get_userdeptid(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_deptid: str = UserInfoKeyConst.DEPT_ID) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_tenantid: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_deptid, default=MyStringUtils.EMPTY) @classmethod def get_userdept_shortname(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_deptshortname: str = UserInfoKeyConst.DEPT_NAME_SHORT) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_deptshortname: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_deptshortname, default=MyStringUtils.EMPTY) @classmethod def get_userdept_fullname(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_deptfullname: str = UserInfoKeyConst.DEPT_NAME_FULL) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_deptfullname: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_deptfullname, default=MyStringUtils.EMPTY) @classmethod def get_user_ouid(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_ouid: str = UserInfoKeyConst.OU_ID) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_ouid: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_ouid, default=MyStringUtils.EMPTY) @classmethod def get_user_ouname(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_ouname: str = UserInfoKeyConst.OU_NAME) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_ouname: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_ouname, default=MyStringUtils.EMPTY) @classmethod def get_user_email(cls, global_obj=None, key_userinfo: str = GlobalConst.G_KEY_USERINFO, key_email: str = UserInfoKeyConst.EMAIL) -> str: r''' 从全局缓存中获取用户信息 , 然后获取 , 不会返回None,只会返回 EMPTY Args: global_obj: key_userinfo: key_email: Returns: ''' return cls.get_key_from_userinfo(global_obj=global_obj, key_userinfo=key_userinfo, key=key_email, default=MyStringUtils.EMPTY)