#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import logging from typing import Optional from pymysql.connections import Connection from pymysql.cursors import DictCursor from .app_exception import AppRuntimeException from .common_app_config import CommonAppConfig from ..globalutility import Utility from ..my_stringutils import MyStringUtils class QueryResultWithFoundRows: result_list: Optional[list] = None found_rows: Optional[int] = None def __init__(self, **kwargs): self.result_list = kwargs.get("result_list") self.found_rows = kwargs.get("found_rows") def __str__(self): return Utility.dict2jsonstr(self.__dict__) class DbBehaviourAfterExec: r""" 数据库语句执行完成后的连接操作行为,不依赖任何其他应用基础类 默认 commit_after_exec 和 close_conn_after_exec 为 False close_conn_when_exception 和 rollback_when_exception 为 True ,commit_when_exception 为 False rollback_when_exception 的优先级比 commit_when_exception 高, 当 close_conn_when_exception 为True时,如果 rollback_when_exception 和 commit_when_exception 都为False, 则执行 rollback 操作 """ commit_after_exec: bool = True close_conn_after_exec: bool = True close_conn_when_exception: bool = True rollback_when_exception: bool = True commit_when_exception: bool = False def __init__(self, commit_after_exec: bool = True, close_conn_after_exec: bool = True, close_conn_when_exception: bool = True, rollback_when_exception: bool = True, commit_when_exception: bool = False): r""" 初始化数据库行为 :param commit_after_exec: :param close_conn_after_exec: :param close_conn_when_exception: :param rollback_when_exception: :param commit_when_exception: """ self.commit_after_exec = commit_after_exec self.close_conn_after_exec = close_conn_after_exec self.close_conn_when_exception = close_conn_when_exception self.rollback_when_exception = rollback_when_exception self.commit_when_exception = commit_when_exception @classmethod def gen_no_commit_close_behaviour(cls): r""" 生成一个不commit,不close,异常时rollback、close的数据库行为 :return: """ return DbBehaviourAfterExec( commit_after_exec=False, close_conn_after_exec=False, close_conn_when_exception=True, rollback_when_exception=True, commit_when_exception=False) class BaseDbService: r""" 基础 dbService类,定义一些基础函数,不依赖任何其他应用基础类 """ def __init__(self, logger: logging.Logger = None): self.logger = logger or CommonAppConfig().common_logger def join_str(self, *to_join_list: str, separator_str: str = '', wrapper_str: str = None, ignore_none: bool = False) -> str: r""" 连接 list, 当值为 None 时当做 null字符串 添加,但是不追加 wrapper_str :param to_join_list :param separator_str 默认使用空字符串 :param wrapper_str :param ignore_none 默认为True :return """ return Utility.list_join_to_str(to_join_list, separator_str=separator_str, wrapper_str=wrapper_str, ignore_none=ignore_none) def do_filter_mysql_param_with_single_quotes(self, to_filter_param: str, append_fix: str = "'") -> str: r""" mysql防注入使用,用于过滤(替换)拼接字符串中的参数,当拼接`key`='value'时,对value中的特殊字符进行替换 :param to_filter_param :param append_fix 默认为单引号 :return """ return Utility.do_filter_mysql_param(to_filter_param=to_filter_param, append_fix=append_fix) def do_filter_mysql_param_with_single_quotes_for_fuzzy_search(self, to_filter_param: str, append_fix: str = "'") -> str: r""" mysql防注入使用,用于过滤(替换)拼接字符串中的参数,当拼接`key`='value'时,对value中的特殊字符进行替换 :param to_filter_param :param append_fix 默认为单引号 :return """ return Utility.do_filter_mysql_param_with_single_quotes_for_fuzzy_search(to_filter_param=to_filter_param, append_fix=append_fix) def do_filter_mysql_param_list_with_single_quotes( self, to_filter_param_list: list, append_fix: str = "'" ) -> Optional[list]: r""" mysql防注入使用,用于过滤(替换)拼接字符串中的参数,当拼接`key`='value'时,对value中的特殊字符进行替换 Utility.do_filter_mysql_param_list_with_single_quotes函数有bug,所以自己实现一下 :param to_filter_param_list :param append_fix 默认为单引号 :return """ if to_filter_param_list is None: return None return [self.do_filter_mysql_param_with_single_quotes(tmp_key, append_fix=append_fix) for tmp_key in to_filter_param_list] def gen_default_db_behaviour_after_exec(self): r""" 生成默认的 db_behaviour_after_exec 默认行为为 正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :return: """ return DbBehaviourAfterExec( commit_after_exec=True, close_conn_after_exec=True, close_conn_when_exception=True, rollback_when_exception=True, commit_when_exception=False) def commit(self, conn: Connection, close_conn_after_exec: bool = True, db_behaviour_after_exec: DbBehaviourAfterExec = None) -> bool: r""" 执行commit操作 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param close_conn_after_exec :param db_behaviour_after_exec :return """ if conn is None: return True conn.commit() if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() if close_conn_after_exec or (db_behaviour_after_exec and db_behaviour_after_exec.close_conn_after_exec): conn.close() return True def rollback(self, conn: Connection, close_conn_after_exec: bool = True, db_behaviour_after_exec: DbBehaviourAfterExec = None) -> bool: r""" 执行rollback操作 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param close_conn_after_exec :param db_behaviour_after_exec :return """ if conn is None: return True conn.rollback() if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() if close_conn_after_exec or (db_behaviour_after_exec and db_behaviour_after_exec.close_conn_after_exec): conn.close() return True def close_conn(self, conn: Connection): r""" 关闭连接 :param conn """ if conn is None: return conn.close() def do_after_exec(self, conn: Connection, result, encountered_exception, db_behaviour_after_exec: DbBehaviourAfterExec): r""" 数据库执行完毕后的行为封装 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param result :param encountered_exception :param db_behaviour_after_exec :return: """ if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() if not encountered_exception: return self.do_after_exec_without_exception(conn, result, db_behaviour_after_exec) else: return self.do_after_exec_with_exception(conn, encountered_exception, db_behaviour_after_exec) def do_after_exec_without_exception(self, conn: Connection, result, db_behaviour_after_exec: DbBehaviourAfterExec): r""" 数据库执行完毕后的行为封装 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param result :param db_behaviour_after_exec :return: """ if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() close_conn_after_exec = True if db_behaviour_after_exec.close_conn_after_exec else False commit_after_exec = True if db_behaviour_after_exec and db_behaviour_after_exec.commit_after_exec else False if commit_after_exec and conn is not None: conn.commit() if close_conn_after_exec and conn is not None: conn.close() return result def do_after_exec_with_exception(self, conn: Connection, encountered_exception, db_behaviour_after_exec: DbBehaviourAfterExec): r""" 数据库执行完毕后的行为封装 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param encountered_exception :param db_behaviour_after_exec :return: """ if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() close_conn_when_exception = True if db_behaviour_after_exec.close_conn_when_exception else False rollback_when_exception = True if db_behaviour_after_exec.rollback_when_exception else False commit_when_exception = True if db_behaviour_after_exec.commit_when_exception else False if close_conn_when_exception and not rollback_when_exception and not commit_when_exception: # 当异常时关闭连接 但是 rollback_when_exception 和 commit_when_exception 都为False,则默认使用 rollback rollback_when_exception = True if rollback_when_exception and conn is not None: conn.rollback() elif commit_when_exception and conn is not None: conn.commit() if close_conn_when_exception and conn is not None: conn.close() raise encountered_exception def querydb( self, conn: Connection, query_str: str, factory_method=None, db_behaviour_after_exec: DbBehaviourAfterExec = None ) -> list: r""" 查询数据库 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param query_str :param factory_method :param db_behaviour_after_exec :return """ if conn is None: raise AppRuntimeException(message="未提供conn", detail="执行数据库操作时,传入的conn为None") encountered_exception: Optional[BaseException] = None result: Optional[list] = None try: self.logger.debug(f"执行sql语句:{query_str}") with conn.cursor(cursor=DictCursor) as cursor: try: cursor.execute(query_str) result = cursor.fetchall() or list() finally: cursor.close() self.logger.debug(f"执行sql语句:{query_str} 返回结果: {result}") except BaseException as e: encountered_exception = e self.logger.debug(f"执行sql语句:{query_str} 发生异常: {e}") if not encountered_exception and factory_method is not None and result: try: result = [factory_method(tmpDict) for tmpDict in result] except BaseException as e: encountered_exception = e if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() return self.do_after_exec(conn, result, encountered_exception, db_behaviour_after_exec) def querycount_db(self, conn: Connection, query_str: str, db_behaviour_after_exec: DbBehaviourAfterExec = None) -> int: r""" 查询数据库,执行count数据库查询,直接得到返回的整数 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param query_str :param db_behaviour_after_exec :return """ if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() ret_list = self.querydb(conn, query_str, db_behaviour_after_exec=db_behaviour_after_exec) if not ret_list: raise ValueError('数据库查询语句返回结果集为空') return int(list(ret_list[0].values())[0]) def querydb_with_single_column_and_line(self, conn: Connection, query_str: str, db_behaviour_after_exec: DbBehaviourAfterExec = None) -> Optional[str]: r""" 查询数据库,执行返回单列单行的数据,并获取字符串,查询的数据不存在时,返回None 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param query_str :param db_behaviour_after_exec :return """ if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() ret_list = self.querydb(conn, query_str, db_behaviour_after_exec=db_behaviour_after_exec) if not ret_list: return None return MyStringUtils.to_str(list(ret_list[0].values())[0]) def querydb_with_sql_calc_found_rows( self, conn: Connection, query_str: str, factory_method=None, db_behaviour_after_exec: DbBehaviourAfterExec = None ) -> QueryResultWithFoundRows: r""" 查询数据库,为带有 SQL_CALC_FOUND_ROWS 的sql语句返回 FOUND_ROWS() 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param query_str :param factory_method :param db_behaviour_after_exec :return """ if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() tmp_db_behaviour_after_exec = DbBehaviourAfterExec.gen_no_commit_close_behaviour() tmp_db_behaviour_after_exec.close_conn_when_exception = db_behaviour_after_exec.close_conn_when_exception tmp_db_behaviour_after_exec.commit_when_exception = db_behaviour_after_exec.commit_when_exception tmp_db_behaviour_after_exec.rollback_when_exception = db_behaviour_after_exec.rollback_when_exception ret_list = self.querydb(conn, query_str, factory_method=factory_method, db_behaviour_after_exec=tmp_db_behaviour_after_exec) found_rows = self.querydb_with_single_column_and_line( conn=conn, query_str="select FOUND_ROWS() as found_rows", db_behaviour_after_exec=db_behaviour_after_exec ) found_rows = int(found_rows) return QueryResultWithFoundRows(result_list=ret_list, found_rows=found_rows) def operate_db(self, conn: Connection, *operate_sql_list: str, sql_list: list = None, db_behaviour_after_exec: DbBehaviourAfterExec = None) -> int: r""" 操作数据库,返回受影响的行数, 可以有两种提供操作语句的方式,或者直接执行多条语句,或者使用 sql_list 提供操作语句列表 当 db_behaviour_after_exec为None时,使用默认的 db_behaviour_after_exec,即正常操作后commit、关闭数据库, 异常时rollback、关闭数据库 :param conn :param operate_sql_list :param sql_list :param db_behaviour_after_exec :return """ if conn is None: raise AppRuntimeException(message="未提供conn", detail="执行数据库操作时,传入的conn为None") encountered_exception: Optional[BaseException] = None result: int = 0 try: # 先生成列表 real_sql_list = [] if operate_sql_list: real_sql_list += operate_sql_list elif sql_list: real_sql_list += sql_list else: raise ValueError('未提供sql语句') with conn.cursor(cursor=DictCursor) as cursor: try: for tmp_sql_str in real_sql_list: self.logger.debug(f"执行sql语句:{tmp_sql_str}") result += cursor.execute(tmp_sql_str) self.logger.debug(f"执行sql语句:{tmp_sql_str} 返回结果: {result}") finally: cursor.close() except BaseException as e: encountered_exception = e if db_behaviour_after_exec is None: db_behaviour_after_exec = self.gen_default_db_behaviour_after_exec() return self.do_after_exec(conn, result, encountered_exception, db_behaviour_after_exec)