util_methods.py 13.8 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""

import logging
from typing import Optional, List

from .app_exception import AppException
from .base_const import ConstResponseCode
from .common_app_config import CommonAppConfig


class UtilMethods:
    @classmethod
    def get_intvalue_from_dict_and_check_range(
            cls,
            source_dict: dict,
            key_name: str,
            min_value: Optional[int] = None,
            max_value: Optional[int] = None,
            other_allowed_values: List[int] = None,
            action_desc: str = None,
            check_null: bool = True,
            default_value: int = 0,
    ) -> int:
        r"""
        从字典中获取指定key的值,并检查其是否是int,并检查其值的范围
        :param source_dict:
        :param key_name:
        :param min_value:可选值,提供则判断最小值,
        :param max_value:可选值
        :param other_allowed_values: 允许的值范围,例如 [-1], 在此范围内则不校验 最小/最大值
        :param action_desc: 抛出异常时的描述
        :param check_null: 默认为True,即值为None是抛出异常
        :param default_value: 默认值, None表示0

        """
        action_desc = action_desc or "从字典中获取字符串值"
        if not key_name:
            result = source_dict
        else:
            result = source_dict.get(key_name)

        if result is None:
            if check_null:
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时未提供{key_name}"
                    )
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据({result})为空"
                    )
            else:
                result = default_value if default_value is not None else 0
                if not isinstance(result, int):
                    if key_name:
                        raise AppException(
                            code=ConstResponseCode.CODE_SYS_ERROR,
                            message=f"{action_desc}时未提供{key_name},使用默认值 {result},但不是int类型"
                        )
                    else:
                        raise AppException(
                            code=ConstResponseCode.CODE_MISSING_PARAMETER,
                            message=f"{action_desc}时源数据({result})为空,使用默认值 {result},但不是int类型"
                        )
        elif not isinstance(result, int):
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}{key_name}不是int类型"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据({result})不是int类型"
                )

        if other_allowed_values and result in other_allowed_values:
            return result

        if min_value is not None and result < min_value:
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}{key_name}的值小于{min_value}"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据({result})小于{min_value}"
                )

        if max_value is not None and result > max_value:
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}{key_name}的值大于{max_value}"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据({result})大于{max_value}"
                )
        return result

    @classmethod
    def get_strvalue_from_dict(
            cls,
            source_dict: dict,
            key_name: str,
            check_null_or_empty: bool = True,
            default_value: str = None,
            action_desc: str = None
    ) -> str:
        r"""
        从字典获取字符串,
        :param source_dict:
        :param key_name:
        :param check_null_or_empty:是否检查值是否为空或null,默认为True
        :param default_value: 默认值,默认为空字符串
        :param action_desc: 抛出异常时的描述
        """
        if not key_name:
            result_obj = source_dict
        else:
            result_obj = source_dict.get(key_name)
        if result_obj is not None:
            result_obj = f"{result_obj}"
        if check_null_or_empty and not result_obj:
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时未提供{key_name}"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据为空"
                )

        return result_obj or default_value or ""

    @classmethod
    def get_boolvalue_from_dict(
            cls,
            source_dict: dict,
            key_name: str,
            check_null: bool = True,
            default_value: bool = False,
            action_desc: str = None
    ) -> bool:
        r"""
        从字典获取bool值, 注意 default_value 是 None 时, 默认值为 None
        :param source_dict:
        :param key_name:
        :param check_null:是否检查值是否为空或null,默认为True
        :param default_value: 默认值,默认为 False
        :param action_desc: 抛出异常时的描述
        """
        if not key_name:
            result_obj = source_dict
        else:
            result_obj = source_dict.get(key_name)

        if result_obj is None:
            if check_null:
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时未提供{key_name}"
                    )
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据({result_obj})为None"
                    )

            else:
                result_obj = default_value

        if not isinstance(result_obj, bool):
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}{key_name}不是bool类型"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据({result_obj})不是bool类型"
                )

        return result_obj

    @classmethod
    def get_objvalue_from_dict(
            cls,
            source_dict: dict,
            key_name: str,
            obj_class,
            action_desc: str = None,
            check_null: bool = True,
            **kwargs
    ):
        r"""
        从字典获取对象,
        :param source_dict:
        :param key_name:
        :param obj_class:
        :param action_desc:
        :param check_null:
        :param kwargs:
        """
        if not key_name:
            result_obj = source_dict
        else:
            result_obj = source_dict.get(key_name)

        if isinstance(result_obj, obj_class):
            return result_obj
        elif isinstance(result_obj, dict):
            kwargs.update(result_obj)
            return obj_class().init_by_create_dict(**kwargs)
        elif result_obj is None:
            if check_null:
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时未提供{key_name}"
                    )
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据({result_obj})为空"
                    )
            else:
                return None
        else:
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}{key_name}为不支持的类型"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据({result_obj})为不支持的类型"
                )

    @classmethod
    def get_enumvalue_from_dict(
            cls,
            source_dict: dict,
            key_name: str,
            obj_class,
            action_desc: str = None,
            logger: logging.Logger = None,
            check_null: bool = True,
            default_value=None,
    ):
        r"""
        从字典获取枚举类型对象
        :param source_dict:
        :param key_name:
        :param obj_class
        :param action_desc:
        :param logger:
        :param check_null:
        :param default_value:
        """
        logger = logger or CommonAppConfig().common_logger
        if not key_name:
            result_obj = source_dict
        else:
            result_obj = source_dict.get(key_name)

        if isinstance(result_obj, obj_class):
            return result_obj
        elif result_obj is None:
            if check_null:
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时未提供{key_name}"
                    )
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据({result_obj})为None"
                    )
            else:
                return default_value
        else:
            try:
                return obj_class(str(result_obj))
            except BaseException as e:
                encountered_e = e

            if encountered_e:
                logger.error(encountered_e)
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}{key_name}不是有效的{obj_class.__name__}值类型"
                    ).with_traceback(encountered_e.__traceback__)
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据({result_obj})不是有效的{obj_class.__name__}值类型"
                    ).with_traceback(encountered_e.__traceback__)

    @classmethod
    def get_listvalue_from_dict(
            cls,
            source_dict: dict,
            key_name: str,
            obj_class,
            check_null: bool = False,
            action_desc: str = None
    ) -> list:
        r"""
        从字典获取列表,
        :param source_dict:
        :param key_name:
        :param obj_class
        :param check_null: 默认为False
        :param action_desc:
        """
        if not key_name:
            tmp_key_value = source_dict
        else:
            tmp_key_value = source_dict.get(key_name)

        if not tmp_key_value:
            if check_null:
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时未提供{key_name}"
                    )
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据为空"
                    )

            else:
                return list()

        if not isinstance(tmp_key_value, list):
            if key_name:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时提供{key_name}不是列表"
                )
            else:
                raise AppException(
                    code=ConstResponseCode.CODE_MISSING_PARAMETER,
                    message=f"{action_desc}时源数据({tmp_key_value})不是列表"
                )

        result_obj = list()
        for tmp_obj in tmp_key_value:
            if isinstance(tmp_obj, obj_class):
                result_obj.append(tmp_obj)
            elif isinstance(tmp_obj, dict):
                result_obj.append(obj_class().init_by_create_dict(**tmp_obj))
            else:
                if key_name:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}{key_name}中的对象是不支持的类型"
                    )
                else:
                    raise AppException(
                        code=ConstResponseCode.CODE_MISSING_PARAMETER,
                        message=f"{action_desc}时源数据({tmp_key_value})中的对象是不支持的类型"
                    )

        return result_obj