#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import datetime import logging import random import string import threading import traceback from typing import Union, Optional from flask import Request, make_response, Response, request, g from pymysql import Connection from .app_exception import AppException, AppRuntimeException from .app_response import AppResponse from .base_const import ConstResponseCode, ConstBaseApp from .base_db_service import DbBehaviourAfterExec, BaseDbService from .common_app_config import CommonAppConfig from .logined_user import LoginedUserInfo from .utils_http.http_cookie import HttpCookie from .utils_http.http_utils import HttpUtils from .utils_k8s.k8s_api_object_utils import OperationResultException from .utils_tg_configcenter.configcenter_utils import ConfigCenterUtils from .utils_tg_iam.iam_utils import IamUtils from .utils_tg_sso.sso_utils import SsoUtils from .utils_v2 import UtilityV2 from ..my_stringutils import MyStringUtils class CommonBaseWebService: """ 基础微服务-基类, 继承类需要按照实际需要覆盖基类的 app_config() , get_app_product_code() , get_iam_domain() , get_config_center_domain() , app_utils() 方法, 还有 gen_app_db_conn() 方法, refresh_app_config() 等方法, 尤其是 refresh_app_config() 方法,如果需要动态更新应用logLevel和自己的应用配置,需要覆盖该方法 """ need_login: bool = True r""" 应用是否需要登录,默认为True,继承类自行覆盖 """ _conn: Connection = None r""" 应用使用的conn,因为conn使用完需要关闭,发生异常时可能无法关闭,所以在基类做统一的关闭处理, 如果具体的服务需要使用数据库连接,可以初始化这个属性,然后关闭操作可以交给基类 需要继承类实现 gen_app_db_conn 方法以初始化该属性, 示例: def gen_app_db_conn(self) -> Connection: raise AppRuntimeException( message=f"TODO:gen_app_db_conn is To Implement", detail=f"TODO:gen_app_db_conn is To Implement" ) """ no_commit_close_behaviour = DbBehaviourAfterExec.gen_no_commit_close_behaviour() commit_and_close_behaviour = DbBehaviourAfterExec() commit_and_no_close_behaviour = DbBehaviourAfterExec( commit_after_exec=True, close_conn_after_exec=False, close_conn_when_exception=True, rollback_when_exception=True, commit_when_exception=False ) _app_product_code: str = None r""" iam鉴权时使用的 product_code, 需要继承类实现 get_app_product_code 方法以初始化该属性, 示例: def get_app_product_code(self) -> str: raise AppRuntimeException( message=f"TODO:get_app_product_code is To Implement", detail=f"TODO:get_app_product_code is To Implement" ) """ # 应用使用的logger logger: logging.Logger = None _app_config = None r""" app_config , 需要继承类自行实现该属性,示例: @property def app_config(self) -> MyAppConfig: if self._app_config is None: self._app_config = MyAppConfig.load_app_config() return self._app_config @classmethod def load_app_config(cls, conf_filepath: str = None): if not conf_filepath: env_name = ConstGen.ENV_APP_CONFIG_FILEPATH conf_filepath = os.environ.get(env_name) if not conf_filepath: conf_filepath = ConstGen.DEFAULT_APP_CONFIGFILE_PATH app_config_dict = UtilityV2().load_file_as_dict(conf_filepath) return MyAppConfig(**app_config_dict) """ common_app_config: CommonAppConfig = None r""" 应用的通用配置类 """ is_debug: bool = False # 应用使用的utils实例,减少utils创建次数 _app_utils = None r""" 应用使用的utils实例,减少utils创建次数,默认为 UtilityV2 类型, 如果应用有自己的继承自 UtilityV2 的util类,需要自行覆盖该属性,示例: @property def app_utils(self)->AppUtils: if self._app_utils is None: self._app_utils=AppUtils(logger=self.logger) return self._app_utils """ http_utils: HttpUtils = None r""" 通用的http便捷类实例,减少utils创建次数 """ request: Request = None r""" 应用处理的request """ request_json: dict = None r""" API接收的json数据 """ request_values: dict = None r""" query参数和form-data或x-www-form-urlencoded传入的参数综合,即request.args和request.form的综合,key相同时query参数优先,先出现的key优先 """ params_json: dict = None r""" API接收的json数据,根据query参数或post body决定,两者都提供时, POST body 中的 json内容优先,不进行合并 """ login_user: Optional[LoginedUserInfo] = None r""" 访问服务的用户身份,通过 g.user_info 获取 """ resp_cookie: HttpCookie = None r""" 用于设置 response Cookie , API 需要设置 Cookie时,设置该属性 """ is_login: bool = False r""" 根据login_user是否存在,分析出的是否登录属性 """ # 天宫个性化便捷类 _iam_domain: str = None r""" 生成iam_utils时使用的iam_domain属性, 需要继承类实现 get_iam_domain 方法以初始化该属性, 示例: def get_iam_domain(self) -> str: raise AppRuntimeException( message=f"TODO:get_iam_domain is To Implement", detail=f"TODO:get_iam_domain is To Implement" ) """ _iam_utils: IamUtils = None _config_center_domain: str = None r""" 生成configcenter_utils时使用的config_center_domain属性, 需要继承类实现 get_config_center_domain 方法以初始化该属性, 示例: def get_config_center_domain(self) -> str: raise AppRuntimeException( message=f"TODO:get_config_center_domain is To Implement", detail=f"TODO:get_config_center_domain is To Implement" ) """ _configcenter_utils: ConfigCenterUtils = None def __init__(self, a_request=None): r""" 初始化,并获取登陆用户信息以及是否登陆状态 :param a_request: """ # 初始化应用的config self.common_app_config = CommonAppConfig() self.refresh_app_config() a_request = a_request or request # 初始化应用使用的logger random_str = ''.join([random.choice(string.digits + string.ascii_letters) for _ in range(16)]) self.logger = UtilityV2.gen_logger( logger_name=threading.currentThread().getName() + "-" + random_str, log_level=CommonAppConfig().log_level, ) # 根据应用的config判断当前是否是debug模式 self.is_debug = self.common_app_config.is_debug # 初始化通用的http便捷类 self.http_utils: HttpUtils = HttpUtils(incoming_request=a_request, logger=self.logger) # 初始化应用处理的request self.request = a_request # 初始化 self.request_values self.request_values = self.request.values.to_dict() # 初始化 self.request_json # 因为 self.request.json 在请求中的 Content - Type为application / json时才不为空, # 而且此时body必须有内容,如果没有内容,调用 self.request.json 会报异常, 所以封装一个不会抛出异常的方法 self.request_json = self.gen_request_json_content() # 初始化 params_json if self.request_json != {}: self.params_json = self.request_json else: params_in_query = self.request.args.get('params') if params_in_query: self.params_json = self.app_utils.jsonstr2dict(params_in_query) if self.params_json is None: self.params_json = dict() # 初始化应用的访问者身份 if hasattr(g, "user_info") and isinstance(g.user_info, dict): self.login_user: LoginedUserInfo = SsoUtils( sso_api="dontcare", logger=self.logger ).gen_user_info_from_sso_api_response( action="从用户信息字典中获取用户信息", user_info_dict=g.user_info, access_token=request.cookies.get("accessToken"), ) else: self.login_user: Optional[LoginedUserInfo] = None self.is_login = self.login_user is not None and self.login_user.is_login() def do_service(self): r""" 通用的服务执行模版,记录了服务访问时长以及捕获 SafeException 时返回 code, message json 请覆盖 my_do_service() 实现你自己的逻辑 :return: """ start_time = datetime.datetime.now() try: # 记录开始调用日志 self.log_start_call_info() # 获取服务返回结果 ret_obj = self.gen_ret_obj() # 计算服务时长, end_time = datetime.datetime.now() consume_ms = (end_time - start_time).total_seconds() * 1000 # 记录访问结束日志 self.log_end_call_info(consume_ms=consume_ms, ret_obj=ret_obj) # 尝试关闭服务连接 self.try_commit_and_close_conn() # 返回响应信息 resp = self.handle_ret_obj(ret_obj) if self.resp_cookie and isinstance(resp, Response): # 设置cookie self.resp_cookie.do_set_cookie(resp) if isinstance(resp, Response): resp.headers["Connection"] = "close" return resp except AppException as safe_exception: self.try_commit_and_close_conn() return self.handle_app_exception(safe_exception, start_time) except BaseException as e: exception_tracback = traceback.format_exc() self.try_commit_and_close_conn() return self.handle_common_exception(e, exception_tracback, start_time) def log_start_call_info(self): r""" 记录开始调用日志 """ user_name = self.login_user.user_name if self.login_user else None user_id = self.login_user.user_id if self.login_user else None self.logger.info(self.app_utils.join_str_with_none( f"{user_name}({user_id}) 服务调用:", self.__class__.__name__, " 开始处理时间:", self.app_utils.now_daytime(), " values=", self.app_utils.dict2jsonstr(self.request_values), " json=", self.app_utils.dict2jsonstr(self.request_json), f" 访问路径:{self.request.method} {self.request.path}", )) def log_end_call_info(self, consume_ms: float, e=None, exception_tracback=None, ret_obj=None): r""" 记录结束调用日志 """ user_name = self.login_user.user_name if self.login_user else None user_id = self.login_user.user_id if self.login_user else None if e is None: if isinstance(ret_obj, (AppResponse, str)): ret_str = str(ret_obj) elif isinstance(ret_obj, dict): ret_str = self.app_utils.dict2jsonstr(ret_obj) elif isinstance(ret_obj, list): ret_str = self.app_utils.list2jsonstr(ret_obj) else: ret_str = ret_obj self.logger.info(self.app_utils.join_str_with_none( f"{user_name}({user_id}) 服务调用:", self.__class__.__name__, f" 正常结束,耗时 {consume_ms} 豪秒,返回内容:{ret_str}" )) else: self.logger.error( self.app_utils.join_str_with_none( f"{user_name}({user_id}) 服务调用:", self.__class__.__name__, f" 异常结束,耗时 {consume_ms} 豪秒" ) ) self.logger.error(self.join_str(self.__class__.__name__, f'服务发生异常:{e.__class__.__name__}:{e}')) self.logger.error(exception_tracback) def gen_ret_obj(self): r""" 获取服务返回对象, 统一业务逻辑,减少业务代码 """ # 统一业务逻辑,减少业务代码 if self.need_login and not self.is_login: ret_obj = self.action_when_not_login() else: # 执行业务逻辑 ret_obj = self.my_do_service() return ret_obj def handle_ret_obj(self, ret_obj): r""" 处理 ret_obj """ if isinstance(ret_obj, AppResponse): resp = make_response(str(ret_obj), int(ret_obj.status_code)) elif isinstance(ret_obj, str): resp = make_response(ret_obj) elif isinstance(ret_obj, dict): resp = make_response(self.app_utils.dict2jsonstr(ret_obj)) elif isinstance(ret_obj, list): resp = make_response(self.app_utils.list2jsonstr(ret_obj)) else: resp = ret_obj return resp def my_do_service(self): r""" 通用的 do_service(),调用该函数,请覆盖该函数实现你自己的逻辑 :return: """ raise AppRuntimeException( message=f"TODO:my_do_service is To Implement", detail=f"TODO:my_do_service is To Implement" ) def try_commit_and_close_conn(self): r""" 尝试关闭conn :return: """ try: BaseDbService(logger=self.logger).commit(conn=self._conn, close_conn_after_exec=True) except BaseException: pass def gen_ok_resp(self, data=None): r""" 快速生成OK的返回信息 :param data: :return: """ return AppResponse(code=ConstResponseCode.CODE_OK, data=data) def gen_sys_error_resp(self, data=None): r""" 快速生成系统错误的返回信息 :param data: :return: """ return AppResponse(code=ConstResponseCode.CODE_SYS_ERROR, data=data) def gen_not_login_err(self) -> AppResponse: r""" 快速生成未登录错误 :return: """ return AppResponse(code=ConstResponseCode.CODE_AUTH_FAILURE) def raise_not_login_err(self): r""" 快速抛出未登录异常 """ raise AppException(code=ConstResponseCode.CODE_AUTH_FAILURE) def gen_no_auth_err(self, message: str = None) -> AppResponse: r""" 快速生成缺少权限错误 :param message: :return: """ return AppResponse(code=ConstResponseCode.CODE_UNAUTHORIZE_OPERATION, message=message) def raise_no_auth_err(self, message: str = None): r""" 快速生成缺少权限错误 :param message: :return: """ raise AppException(code=ConstResponseCode.CODE_UNAUTHORIZE_OPERATION, message=message) def gen_missing_param_err(self, message: str = None): r""" :param message: :return: """ return AppResponse(code=ConstResponseCode.CODE_MISSING_PARAMETER, message=message) def gen_status_err(self, message: str = None): r""" :param message: :return: """ return AppResponse(code=ConstResponseCode.CODE_STATUS_ERROR, message=message) def raise_missing_param_err(self, message: str = None): r""" :param message: :return: """ raise AppException(code=ConstResponseCode.CODE_MISSING_PARAMETER, message=message) def raise_status_err(self, message: str = None): r""" :param message: :return: """ raise AppException(code=ConstResponseCode.CODE_STATUS_ERROR, message=message) def join_str(self, *to_join_list: str, separator_str: str = '', wrapper_str: str = None, ignore_none: bool = False) -> str: r""" 连接 list,当值为 None 时当做 null字符串添加,但是不追加wrapper_str :param to_join_list :param separator_str :param wrapper_str :param ignore_none :return """ return self.app_utils.list_join_to_str( to_join_list, separator_str=separator_str, wrapper_str=wrapper_str, ignore_none=ignore_none, ) def resp_for_safe_exeception(self, safe_exception: AppException): r""" 处理 AppException 时, 根据 AppException 返回的response """ return make_response(self.app_utils.dict2jsonstr(safe_exception.gen_err()), int(safe_exception.status_code)) def handle_app_exception(self, safe_exception: AppException, start_time): r""" do_service函数中处理应用异常 :param start_time :param safe_exception: :return: """ # 计算服务时长, end_time = datetime.datetime.now() consume_ms = (end_time - start_time).total_seconds() * 1000 ret_obj = self.app_utils.dict2jsonstr(safe_exception.gen_err()) # 记录访问结束日志 self.log_end_call_info(consume_ms=consume_ms, ret_obj=ret_obj) # 计算服务时长,如果超时则通知管理员 # 返回响应信息(错误信息) resp = make_response(ret_obj, int(safe_exception.status_code)) resp.headers["Connection"] = "close" return resp def resp_for_common_exeception(self, e): r""" 处理 e 时, 根据 AppException 返回的response """ err_msg = f"{e.__class__.__name__}:{e}" code = ConstResponseCode.CODE_SYS_ERROR if isinstance(e, AppRuntimeException): self.logger.error(e) err_msg = e.message code = e.code or ConstResponseCode.CODE_SYS_ERROR if isinstance(e, OperationResultException): self.logger.error(e) err_msg = e.failed_reason code = e.code or ConstResponseCode.CODE_SYS_ERROR response = AppResponse( code=code, message=err_msg, ) return make_response(str(response), int(response.status_code)) def handle_common_exception(self, e, exception_tracback, start_time): r""" do_service函数中处理通用的异常 :param e: :param exception_tracback: :param start_time: :return: """ # 计算服务时长, end_time = datetime.datetime.now() consume_ms = (end_time - start_time).total_seconds() * 1000 # 记录访问结束日志 self.log_end_call_info(consume_ms=consume_ms, e=e, exception_tracback=exception_tracback) # 通知管理员 resp = self.resp_for_common_exeception(e) resp.headers["Connection"] = "close" return resp def action_when_not_login(self): r""" do_service时如果未登录的行为,默认为需要登录则返回未登录异常 :return: """ return self.gen_not_login_err() def raise_uims_when_not_login(self): r""" 生成带有uims返回地址的异常 :return: """ uims_api_for_frontpage = "" response: AppResponse = AppResponse() response.code = ConstResponseCode.CODE_AUTH_FAILURE response.message = "请先登录uims并通过uims跳转到天宫后重试" response.data = uims_api_for_frontpage app_exception = AppException() app_exception.my_response = response raise app_exception def gen_request_json_content(self): r""" 生成 request_json的内容 封装一个不会抛出异常的方法 :return: """ try: return self.app_utils.jsonstr2dict(str(self.request.data, encoding="utf-8")) or dict() except BaseException as e: # 打印一下错误信息 self.logger.error("获取self.request.json时发生异常,使用空的dict,异常信息如下:") self.logger.exception(e, exc_info=True) # 异常时返回空的dict return dict() def gen_user_info(self, login_user: LoginedUserInfo = None, output_type: str = "str") -> Union[str, dict]: r""" 生成 {"user_id":"","user_name":""}格式的字典或字符串 :param login_user: 用户, 默认为 self.login_user :param output_type: str or dict, 输出格式,默认为 str """ login_user = login_user or self.login_user return self.app_utils.gen_user_info(login_user=login_user, output_type=output_type) def gen_app_db_conn(self) -> Connection: r""" :return: """ raise AppRuntimeException( message=f"TODO:gen_app_db_conn is To Implement", detail=f"TODO:gen_app_db_conn is To Implement" ) @property def conn(self) -> Connection: r""" conn 属性 :return: """ if self._conn is None: self._conn = self.gen_app_db_conn() return self._conn @conn.setter def conn(self, conn): r""" conn 属性 :param conn: :return: """ self._conn = conn def get_iam_domain(self) -> str: r""" iam_domain 属性 """ raise AppRuntimeException( message=f"TODO:get_iam_domain is To Implement", detail=f"TODO:get_iam_domain is To Implement" ) @property def iam_domain(self) -> str: r""" iam_domain 属性 :return: """ if not self._iam_domain: self._iam_domain = self.get_iam_domain() return self._iam_domain @iam_domain.setter def iam_domain(self, iam_domain): r""" iam_domain 属性 :param iam_domain: :return: """ self._iam_domain = iam_domain @property def iam_utils(self): r""" iam_utils 属性 :return: """ if self._iam_utils is None: self._iam_utils: IamUtils = IamUtils( iam_api=self.iam_domain + "/iam", access_token=self.login_user.access_token if self.login_user else MyStringUtils.EMPTY, http_utils=self.http_utils, app_utils=self.app_utils ) return self._iam_utils def get_config_center_domain(self) -> str: r""" config_center_domain 属性 """ raise AppRuntimeException( message=f"TODO:get_config_center_domain is To Implement", detail=f"TODO:get_config_center_domain is To Implement" ) @property def config_center_domain(self) -> str: r""" config_center_domain 属性 :return: """ if not self._config_center_domain: self._config_center_domain = self.get_config_center_domain() return self._config_center_domain @config_center_domain.setter def config_center_domain(self, config_center_domain): r""" config_center_domain 属性 :param config_center_domain: :return: """ self._config_center_domain = config_center_domain @property def configcenter_utils(self) -> ConfigCenterUtils: r""" configcenter_utils 属性 :return: """ if self._configcenter_utils is None: self._configcenter_utils: ConfigCenterUtils = ConfigCenterUtils( configcenter_api=self.config_center_domain + "/configcenter", access_token=self.login_user.access_token if self.login_user else MyStringUtils.EMPTY, http_utils=self.http_utils, app_utils=self.app_utils ) return self._configcenter_utils @property def app_utils(self) -> UtilityV2: r""" app_utils 属性 :return: """ if self._app_utils is None: self._app_utils = UtilityV2(logger=self.logger) return self._app_utils @app_utils.setter def app_utils(self, app_utils): r""" app_utils 属性 :param app_utils: :return: """ self._app_utils = app_utils @property def app_config(self): r""" app_config 属性 :return: """ if self._app_config is None: raise AppRuntimeException( message=f"TODO:app_config is To Implement", detail=f"TODO:app_config is To Implement" ) return self._app_config @app_config.setter def app_config(self, app_config): r""" app_config 属性 :param app_config: :return: """ self._app_config = app_config def get_app_product_code(self) -> str: r""" app_product_code 属性 """ raise AppRuntimeException( message=f"TODO:get_app_product_code is To Implement", detail=f"TODO:get_app_product_code is To Implement" ) @property def app_product_code(self) -> str: r""" app_product_code 属性 :return: """ if self._app_product_code is None: self._app_product_code = self.get_app_product_code() return self._app_product_code @app_product_code.setter def app_product_code(self, app_product_code): r""" app_product_code 属性 :param app_product_code: :return: """ self._app_product_code = app_product_code def refresh_app_config(self): r""" 刷新 common_app_config 配置 和 应用配置, 默认为每次都读取 ConstBaseApp.COMMON_APP_CONFIG_DEFAULT_CONF_FILE_PATH 文件刷新 common_app_config , 如果继承类需要每次都重新抓取应用配置,则需要覆盖该方法 示例: def refresh_app_config(self): hot_reload_from_env = os.environ.get('HOT_RELOAD_CONFIG') hot_reload = hot_reload_from_env == "true" if hot_reload: # 使用应用提供的配置文件刷新 common_app_config self.common_app_config.refresh_app_config(conf_filepath=MyAppConfig.get_app_conf_filepath()) # 使用应用提供的配置文件刷新 app_config self.app_config.refresh_app_config(conf_filepath=MyAppConfig.get_app_conf_filepath()) 或者 def refresh_app_config(self): # 仍然使用 ConstBaseApp.COMMON_APP_CONFIG_DEFAULT_CONF_FILE_PATH 刷新 common_app_config super().refresh_app_config() hot_reload_from_env = os.environ.get('HOT_RELOAD_CONFIG') hot_reload = hot_reload_from_env == "true" if hot_reload: # 使用应用提供的配置文件刷新 app_config self.app_config.refresh_app_config(conf_filepath=MyAppConfig.get_app_conf_filepath()) """ # 使用 ConstBaseApp.COMMON_APP_CONFIG_DEFAULT_CONF_FILE_PATH 刷新 common_app_config self.common_app_config.refresh_app_config(ConstBaseApp.COMMON_APP_CONFIG_DEFAULT_CONF_FILE_PATH)