my_anykeyvalue_utils.py 4.44 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""

from typing import List, Union

from .globalconst import EnvNameConst, GlobalRetKeyConst
from .globalutility import Utility
from .my_baseexception import create_base_exception
from .my_servicehandle import ServiceHandle
from .my_stringutils import MyStringUtils


class AnyKeyValueUtils:
    r'''
    任意的KV存储的便捷类 <br/>
    支持数据库加锁操作
    '''

    DEFAULT_SERVICE_NAME = 'csm-mysqlpool'
    DEFAULT_READ_API_PATH = '/mysqlpool/standardsql/serviceauth/auth/r'
    DEFAULT_WRITE_API_PATH = '/mysqlpool/trans/serviceauth/auth/w'

    DEFAULT_KEY_TYPE = 'auto-generated'
    DEFAULT_TABLENAME = 'any_key_info'
    DEFAULT_COLUMNNAME_KEYID = "key_id"
    DEFAULT_COLUMNNAME_KEYTYPE = "key_type"
    DEFAULT_COLUMNNAME_KEYINFO = "key_info"
    DEFAULT_COLUMNNAME_REMARK = "remark"

    @classmethod
    def query_any_key(cls, key_id: Union[List[str], str], key_type: str = DEFAULT_KEY_TYPE,
                      table_name: str = DEFAULT_TABLENAME, col_keyid: str = DEFAULT_COLUMNNAME_KEYID,
                      col_keytype: str = DEFAULT_COLUMNNAME_KEYTYPE, col_keyinfo=DEFAULT_COLUMNNAME_KEYINFO,
                      service_name: str = DEFAULT_SERVICE_NAME, api_path=DEFAULT_READ_API_PATH) -> dict:
        r'''
        查询二维码库中的 keyId和keyType对应的info ,
        Args:
            key_id:
            key_type:

        Returns:

        '''

        if key_id is None or not key_id:
            raise create_base_exception("参数错误:未提供 keyinfo", submitted_webarg=Utility.join_str_with_none(cls.__name__,
                                                                                                        ' queryAnyKey时 未提供 keyinfo'))

        # 生成数据库查询语句
        if isinstance(key_id, str):
            key_id = (key_id,)

        key_id_list = key_id

        filtered_idlist = [Utility.do_filter_mysql_param(tmp_key) for tmp_key in key_id_list]

        filtered_keytype = Utility.do_filter_mysql_param(key_type)

        filtered_tablename = Utility.do_filter_mysql_field(table_name)

        filtered_col_keyid = Utility.do_filter_mysql_field(col_keyid)
        filtered_col_keytype = Utility.do_filter_mysql_field(col_keytype)
        filtered_col_keyinfo = Utility.do_filter_mysql_field(col_keyinfo)

        filtered_col_list = [filtered_col_keyid, filtered_col_keyinfo]

        query_str = Utility.join_str_with_none('select ', Utility.list_join_to_str(filtered_col_list), ' from ',
                                               filtered_tablename, ' where ', filtered_col_keyid, ' in (',
                                               Utility.list_join_to_str(filtered_idlist), ') and ',
                                               filtered_col_keytype, '=', filtered_keytype)

        # 生成服务url
        service_name = ServiceHandle.get_info_with_fromenv_first(service_name, EnvNameConst.ENV_MYSQLPOOL_SERVICE)
        service_name = service_name.lstrip('/')
        if not api_path.startswith('/'):
            api_path = Utility.join_str('/', api_path)
        service_url = Utility.join_str(service_name, api_path)

        # 访问查询服务获取数据
        ret_dict = ServiceHandle.querydb(service_url=service_url, query_str=query_str)

        # 判断返回结果
        if not ServiceHandle.check_is_success(ret_dict):
            raise create_base_exception("查询key信息失败", submitted_webarg=Utility.join_str_with_none('查询key信息失败:',
                                                                                                 ret_dict.get(
                                                                                                     GlobalRetKeyConst.RET_VAL)))

        # 如果没有查到数据,直接返回空
        if int(ret_dict.get(GlobalRetKeyConst.ROWCOUNT)) == 0:
            return {tmp_key: MyStringUtils.EMPTY for tmp_key in key_id_list}

        # 遍历查到的信息
        data_rows = ServiceHandle.safe_get_datarows(ret_dict)
        result = {tmp_dict.get(col_keyid): tmp_dict.get(col_keyinfo) for tmp_dict in data_rows}
        # 不存在的key也补上
        map(lambda tmp_key: result.setdefault(tmp_key, MyStringUtils.EMPTY), key_id_list)
        return result