mysql_utils.py 12.3 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""

from typing import Union, List, Tuple, Optional

from .globalutility import Utility
from .my_baseexception import create_base_exception
from .my_stringutils import MyStringUtils


class MysqlUtils:
    """
    mysql语句生成工具类
    """

    @classmethod
    def gen_insertsql(cls, table_name: str, a_dict: dict) -> str:
        r"""
        生成 数据库插入语句,

        map 中的值必须是字符串或None

        当 map 中的值为 None 时,填充null值,

        当 map 为 None 或empty时返回空字符串

        Args:
            table_name:
            a_dict:
        Returns:

        """

        # 判断参数
        if a_dict is None or not a_dict:
            return MyStringUtils.EMPTY

        if not isinstance(a_dict, dict):
            raise ValueError("a_dict 不是字典")

        table_name = MyStringUtils.to_str(table_name, trim=True)
        if MyStringUtils.is_empty(table_name):
            raise ValueError("table_name 不能为空")

        table_name = Utility.do_filter_mysql_field(table_name, append_fix='`')

        columns = []
        values = []
        for tmp_k, tmp_v in a_dict.items():
            tmp_k = Utility.do_filter_mysql_field(tmp_k, append_fix='`')
            tmp_v = Utility.do_filter_mysql_param(tmp_v, append_fix="'")
            columns.append(tmp_k)
            values.append(tmp_v)

        return Utility.join_str_with_none("insert ", table_name, "(",
                                          Utility.list_join_to_str(columns, separator_str=",", ignore_none=False),
                                          ") select ",
                                          Utility.list_join_to_str(values, separator_str=",", ignore_none=False))

    @classmethod
    def gen_insertsql_with_on_duplcate_key_update(cls, table_name: str, a_dict: dict) -> str:
        r"""
        生成 数据库插入语句,当值为null时,填充null值, 使用 ON DUPLICATE KEY UPDATE

        map 中的值必须是字符串或None

        当 map 中的值为 None 时,填充null值,

        当 map 为 None 或empty时返回空字符串

        Args:
            table_name:
            a_dict:
        Returns:

        """

        # 判断参数
        if a_dict is None or not a_dict:
            return MyStringUtils.EMPTY

        if not isinstance(a_dict, dict):
            raise ValueError("a_dict 不是字典")

        table_name = MyStringUtils.to_str(table_name, trim=True)
        if MyStringUtils.is_empty(table_name):
            raise ValueError("table_name 不能为空")

        table_name = Utility.do_filter_mysql_field(table_name, append_fix='`')

        columns = []
        values = []
        on_duplicate_key_update_list = []

        for tmp_k, tmp_v in a_dict.items():
            tmp_k = Utility.do_filter_mysql_field(tmp_k, append_fix='`')
            tmp_v = Utility.do_filter_mysql_param(tmp_v, append_fix="'")
            columns.append(tmp_k)
            values.append(tmp_v)
            tmp_on_duplcate_key_update_content = Utility.join_str(
                tmp_k, "=values(", tmp_v, ")",
                separator_str='', wrapper_str=None,
                ignore_none=False
            )
            on_duplicate_key_update_list.append(tmp_on_duplcate_key_update_content)

        return Utility.join_str_with_none(
            "insert ", table_name, "(",
            Utility.list_join_to_str(columns, separator_str=",", ignore_none=False),
            ") select ",
            Utility.list_join_to_str(values, separator_str=",", ignore_none=False),
            " ON DUPLICATE KEY UPDATE ",
            Utility.list_join_to_str(
                on_duplicate_key_update_list,
                separator_str=",", ignore_none=False
            )
        )

    @classmethod
    def gen_insertsql_multi(cls, table_name: str, a_list: Union[List[dict], Tuple[dict]]) -> Optional[List[str]]:
        r"""
        批量生成 数据库插入语句,

        a_list 中必须是字典,

        a_list 为 None时返回 None

        map 中的值必须是字符串或None

        当 map 中的值为 None 时,填充null值,

        当 map 为 None 或empty时返回空字符串

        Args:
            table_name:
            a_list:
        Returns:

        """

        # 判断参数
        if a_list is None:
            return None

        if not isinstance(a_list, (list, tuple)):
            raise ValueError("a_list 不是列表")

        if not a_list:
            return []

        result_list = []
        for tmp_dict in a_list:
            tmp_sql = cls.gen_insertsql(table_name, tmp_dict)
            result_list.append(tmp_sql)

        return result_list

    @classmethod
    def gen_part_sql_for_update(cls, update_dict: dict, ignore_none: bool = True) -> str:
        r"""
        生成update语句中 set k1=v1,k2=v2 这部分语句(不包括set)

        update中的值为null时,根据 ignore_none 的值决定时忽略该key,还是设置为 set key=null

        ignore_none 默认为True

        Args:
            update_dict:
            ignore_none:
        Returns:

        """
        # 检查参数
        if update_dict is None or not update_dict:
            return MyStringUtils.EMPTY

        if not isinstance(update_dict, dict):
            raise ValueError("update_dict 不是字典")

        col_values = []
        for tmp_k, tmp_v in update_dict.items():
            tmp_k = Utility.do_filter_mysql_field(tmp_k, append_fix='`')
            tmp_v = Utility.do_filter_mysql_param(tmp_v, append_fix="'")
            if tmp_v is None:
                if ignore_none:
                    continue
                else:
                    tmp_v = 'null'

            col_values.append(Utility.join_str(tmp_k, '=', tmp_v))

        if not col_values:
            raise create_base_exception(
                Utility.join_str_with_none("参数错误:",
                                           "update_dict所有的value都是None"),
                submitted_webarg=Utility.join_str_with_none(cls.__name__,
                                                            " gen_part_sql_for_update时 ",
                                                            "update_dict所有的value都是None")
            )

        return Utility.list_join_to_str(col_values, separator_str=',')

    @classmethod
    def gen_part_sql_for_where(cls, where_dict: dict, where_dict_can_be_none_or_empty: bool = False,
                               table_alias_name: str = None) -> str:
        r"""
        生成update语句中 where k1=v1 and k2=v2 这部分语句(不包括where)

        where_dict 中的值为null时,判断语句为 key is null

        当 where_dict 为 None 或 空字典时, 根据 where_dict_can_be_none_or_empty 决定是抛出异常还是返回 1=1

        where_dict_can_be_none_or_empty 默认为False

        Args:
            where_dict:
            where_dict_can_be_none_or_empty:
            table_alias_name: 表的别名
        Returns:

        """
        # 检查参数
        if not where_dict:
            if not where_dict_can_be_none_or_empty:
                raise create_base_exception(
                    Utility.join_str_with_none("参数错误:", "where_dict为空"),
                    submitted_webarg=Utility.join_str_with_none(
                        cls.__name__, " gen_part_sql_for_where时 ", "where_dict为空"
                    )
                )
            else:
                return '1=1'

        if not isinstance(where_dict, dict):
            raise ValueError("where_dict 不是字典")

        where_args = []
        table_alias_name = Utility.do_filter_mysql_field(
            table_alias_name, append_fix='`') if table_alias_name else table_alias_name
        for tmp_k, tmp_v in where_dict.items():
            tmp_k = Utility.do_filter_mysql_field(tmp_k, append_fix='`')
            if table_alias_name:
                tmp_k = Utility.join_str(table_alias_name, ".", tmp_k)
            tmp_v = Utility.do_filter_mysql_param(tmp_v, append_fix="'")
            if tmp_v is None:
                where_args.append(Utility.join_str(tmp_k, ' is null'))
            else:
                where_args.append(Utility.join_str(tmp_k, '=', tmp_v))

        return Utility.list_join_to_str(where_args, separator_str=' and ')

    @classmethod
    def gen_updatesql(cls, table_name: str, update_dict: dict, where_dict: dict, ignore_none: bool = True,
                      where_dict_can_be_none_or_empty: bool = False) -> str:
        r"""
        生成update语句,

        update_dict 中的值为null时,根据 ignore_none 的值决定时忽略该key,还是设置为 set key=null

        ignore_none 默认为True

        如果 update_dict 为 None 或 空字典时,返回 EMPTY

        where_dict 中的值为null时,判断语句为 key is null

        当 where_dict 为 None 或 空字典时, 根据 where_dict_can_be_none_or_empty 决定是抛出异常还是使用 1=1

        where_dict_can_be_none_or_empty 默认为False

        Args:
            table_name:
            update_dict:
            where_dict:
            ignore_none:
            where_dict_can_be_none_or_empty:
        Returns:

        """
        # 检查参数
        if update_dict is None or not update_dict:
            return MyStringUtils.EMPTY

        if (where_dict is None or not where_dict) and not where_dict_can_be_none_or_empty:
            raise create_base_exception(
                Utility.join_str_with_none("参数错误:",
                                           "where_dict为空"),
                submitted_webarg=Utility.join_str_with_none(cls.__name__,
                                                            " gen_updatesql时 ",
                                                            "where_dict为空")
            )

        table_name = MyStringUtils.to_str(table_name, trim=True)
        if MyStringUtils.is_empty(table_name):
            raise ValueError("table_name 不能为空")

        table_name = Utility.do_filter_mysql_field(table_name, append_fix='`')

        return Utility.join_str("update ", table_name, " set ",
                                cls.gen_part_sql_for_update(update_dict, ignore_none), " where ",
                                cls.gen_part_sql_for_where(where_dict,
                                                           where_dict_can_be_none_or_empty))

    @classmethod
    def gen_delsql(cls, table_name: str, where_dict: dict, where_dict_can_be_none_or_empty: bool = False) -> str:
        r"""
        生成delete语句,

        where_dict 中的值为null时,判断语句为 key is null

        当 where_dict 为 None 或 空字典时, 根据 where_dict_can_be_none_or_empty 决定是抛出异常还是使用 1=1

        where_dict_can_be_none_or_empty 默认为False

        Args:
            table_name:
            where_dict:
            where_dict_can_be_none_or_empty:
        Returns:

        """
        # 检查参数
        if (where_dict is None or not where_dict) and not where_dict_can_be_none_or_empty:
            raise create_base_exception(
                Utility.join_str_with_none("参数错误:",
                                           "where_dict为空"),
                submitted_webarg=Utility.join_str_with_none(cls.__name__,
                                                            " gen_delsql时 ",
                                                            "where_dict为空")
            )

        table_name = MyStringUtils.to_str(table_name, trim=True)
        if MyStringUtils.is_empty(table_name):
            raise ValueError("table_name 不能为空")

        table_name = Utility.do_filter_mysql_field(table_name, append_fix='`')

        return Utility.join_str("delete from ", table_name, " where ",
                                cls.gen_part_sql_for_where(where_dict,
                                                           where_dict_can_be_none_or_empty))