#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import random import re import redis from redis.sentinel import Sentinel from .globalconst import RedisConnConst, RedisSentinelConnConst, RedisConst from .globalerror import RedisError, CommonError class RedisHandle: def __init__(self): r""" 初始化变量 """ # 初始化记录连接池的字段,key 为拼接的连接字符串, val为pool self.dict_conn_pool = {} pass def __del__(self): r""" 删除所有连接和池子 """ if self.dict_conn_pool is not None: for k, v in self.dict_conn_pool.items(): del v pass def check_parm(self, class_parm: any = None, dict_parm: dict = None) -> bool: r""" 通过类检测字典是否包含所有的key值 :param class_parm: 类名字 :param dict_parm: 字典 :return: """ if class_parm is not None and dict is not None: for k, v in class_parm.__dict__.items(): if isinstance(v, str) is True and re.match('^__(.*)__$', k) is None: if v not in dict_parm: raise ValueError(RedisError.ERROR_LACK_CONN_PARM + k) else: raise ValueError(CommonError.ERROR_LACK_PARM) return True def get_master(self, dict_sentinelinfo: dict = None) -> list: r""" :param dict_sentinelinfo: 连接字典参数 :return: """ list_ret_val = () try: sentinel = Sentinel((dict_sentinelinfo[RedisSentinelConnConst.CONN_HOSTS]), socket_timeout=0.1) list_ret_val = sentinel.discover_master(dict_sentinelinfo[RedisSentinelConnConst.CONN_CLUSTERNAME]) except redis.sentinel.MasterNotFoundError: raise ValueError( RedisError.ERROR_SENTI_MASTER_NOT_FOUND + dict_sentinelinfo[RedisSentinelConnConst.CONN_HOSTS]) return list_ret_val def get_slave(self, dict_sentinelinfo: dict = None) -> list: r""" :param dict_sentinelinfo: :return: """ list_ret_val = [] try: sentinel = Sentinel((dict_sentinelinfo[RedisSentinelConnConst.CONN_HOSTS]), socket_timeout=0.1) list_ret_val = sentinel.discover_slaves(dict_sentinelinfo[RedisSentinelConnConst.CONN_CLUSTERNAME]) except redis.sentinel.MasterNotFoundError: raise ValueError( RedisError.ERROR_SENTI_MASTER_NOT_FOUND + dict_sentinelinfo[RedisSentinelConnConst.CONN_HOSTS]) return list_ret_val def conn_redis(self, dict_redisinfo: dict = None, b_is_pool: bool = True) -> tuple: """ 用户登录处理 Args: dict_redisinfo: dict redis配置信息 b_is_pool: 是否启用连接池 Returns: dict """ try: if dict_redisinfo is not None: # 检查key是否都存在 for k, v in RedisConnConst.__dict__.items(): if isinstance(v, str) is True and re.match('^__(.*)__$', k) is None: if v not in dict_redisinfo: raise ValueError(RedisError.ERROR_LACK_CONN_PARM + v) else: raise ValueError(CommonError.ERROR_CONN_PARM) pool = None if b_is_pool is True: pool = redis.ConnectionPool(host=dict_redisinfo[RedisConnConst.CONN_IP], port=dict_redisinfo[RedisConnConst.CONN_PORT], db=dict_redisinfo[RedisConnConst.CONN_DB], password=dict_redisinfo[RedisConnConst.CONN_AUTH]) r = redis.Redis(connection_pool=pool) else: r = redis.Redis(host=dict_redisinfo[RedisConnConst.CONN_IP], port=dict_redisinfo[RedisConnConst.CONN_PORT], db=dict_redisinfo[RedisConnConst.CONN_DB], password=dict_redisinfo[RedisConnConst.CONN_AUTH]) except redis.exceptions.AuthenticationError: raise ValueError(RedisError.ERROR_AUTH + dict_redisinfo[RedisConnConst.CONN_AUTH]) except redis.exceptions.ConnectionError: raise ValueError(RedisError.ERROR_CONN + dict_redisinfo[RedisConnConst.CONN_IP] + ":" + dict_redisinfo[ RedisConnConst.CONN_PORT]) return r, pool def get_conn_redis_from_pool(self, redis_pool: any = None) -> any: r""" 从redis连接池获取连接 :param redis_pool: :return: """ try: if redis_pool is None: # 检查参数是否为null raise ValueError(RedisError.ERROR_CONN_POOL_NONE) r = redis.Redis(connection_pool=redis_pool) except redis.exceptions.AuthenticationError: raise ValueError(RedisError.ERROR_AUTH) except redis.exceptions.ConnectionError: raise ValueError(RedisError.ERROR_CONN) return r def init_redis_handle(self, dict_conn_info: dict = None, b_is_sentinel: bool = False, b_is_w: bool = False, b_is_pool: bool = True) -> tuple: r""" 初始化redis Args: dict_conn_info: 连接信息 b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 b_is_w: 是否是写库 b_is_pool: 是否使用连接池 Returns: 返回元组, r,pool redis连接和redis连接池 """ r = None try: list_host: list = [] # 处理连接池字符串 if b_is_sentinel is True: str_conn = "_".join( [conn[0] + "_" + conn[1] for conn in dict_conn_info[RedisSentinelConnConst.CONN_HOSTS]]) else: str_conn = "_".join( [dict_conn_info[RedisConnConst.CONN_IP], dict_conn_info[RedisConnConst.CONN_PORT]]) # 确定redis是读还是写 if b_is_w is True: str_conn = str_conn + "_w" else: str_conn = str_conn + "_r" # 获取redis连接 if str_conn in self.dict_conn_pool: pool = self.dict_conn_pool[str_conn] r = self.get_conn_redis_from_pool(pool) else: if b_is_sentinel is True: dict_sentinel = {k: dict_conn_info[k] for k in (RedisSentinelConnConst.CONN_HOSTS, RedisSentinelConnConst.CONN_CLUSTERNAME)} if b_is_w is True: list_host = self.get_master(dict_sentinel) else: list_t = self.get_slave(dict_sentinel) list_host = list_t[random.randint(0, len(list_t) - 1)] else: list_host.append(dict_conn_info[RedisConnConst.CONN_IP]) list_host.append(dict_conn_info[RedisConnConst.CONN_PORT]) dict_redis = { RedisConnConst.CONN_IP: list_host[0] , RedisConnConst.CONN_PORT: list_host[1] , RedisConnConst.CONN_AUTH: dict_conn_info[RedisConnConst.CONN_AUTH] , RedisConnConst.CONN_DB: dict_conn_info[RedisConnConst.CONN_DB] } for k, v in RedisConnConst.__dict__.items(): # 检查是否是类自定义的属性 if isinstance(v, str) is True and re.match(r'^__(.*)__$', k) is None: if v not in dict_redis: raise ValueError(RedisError.ERROR_LACK_CONN_PARM + v) r, pool = self.conn_redis(dict_redis, b_is_pool) self.dict_conn_pool[str_conn] = pool except redis.exceptions.AuthenticationError: self.dict_conn_pool.pop(str_conn) raise ValueError(RedisError.ERROR_AUTH + dict_redis[RedisConnConst.CONN_AUTH]) except redis.exceptions.ConnectionError: self.dict_conn_pool.pop(str_conn) raise ValueError(RedisError.ERROR_CONN + dict_redis[RedisConnConst.CONN_IP] + ":" + dict_redis[ RedisConnConst.CONN_PORT]) except Exception as E: print(E) raise ValueError(str(E)) return r def get_vals(self, dict_kv: dict = None, dict_redisinfo: dict = None, b_is_sentinel: bool = False) -> dict: r""" 获取一组带映射的redis的kv :param dict_kv: 需要映射的key val值,即需要key,返回值的key为val :param dict_redisinfo: redis连接信息 :param b_is_sentinel: redis连接信息 :return: """ ret_dict = {} # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=False) # 执行redis查询 for k, v in dict_kv.items(): # 处理映射路径 if r.exists(k): ret_dict[v] = r.get(k).decode('utf-8') else: continue return ret_dict def set_vals(self, dict_kv: dict = None, dict_redisinfo: dict = None, b_is_sentinel: bool = False) -> dict: r""" 获取一组带映射的redis的kv :param dict_kv: 需要设置的k。v值 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: redis连接信息 :return: """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) if RedisConst.EXPIRE_TIME in dict_redisinfo: ex = dict_redisinfo[RedisConst.EXPIRE_TIME] else: ex = None # 执行redis查询 for k, v in dict_kv.items(): # 处理映射路径 if isinstance(v, str) is False: raise ValueError(RedisError.ERROR_INSERT_VALUE_TYPE) r.set(name=k, value=v, ex=ex) return def set_key_expire(self, dict_kv: dict = None, dict_redisinfo: dict = None, b_is_sentinel: bool = False) -> bool: r""" 设置一组key时长 :param dict_kv: 需要设置的k。v值 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: redis连接信息 :return: """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) if RedisConst.EXPIRE_TIME in dict_redisinfo: ex = dict_redisinfo[RedisConst.EXPIRE_TIME] else: ex = None # 执行redis查询 for k, v in dict_kv.items(): # 处理映射路径 r.expire(name=k, time=ex) return True def del_key(self, dict_kv: dict = None, dict_redisinfo: dict = None, b_is_sentinel: bool = False) -> bool: r""" 设置一组key时长为0,即删除key :param dict_kv: 需要设置的k。v值 :param dict_redisinfo: redis连接信息 :return: """ dict_redisinfo_tmp = dict_redisinfo.copy() dict_redisinfo_tmp[RedisConst.EXPIRE_TIME] = 0 self.set_key_expire(dict_kv=dict_kv, dict_redisinfo=dict_redisinfo_tmp, b_is_sentinel=b_is_sentinel) return True def set_key_incr(self, dict_kv: dict = None, dict_redisinfo: dict = None, b_is_sentinel: bool = False) -> dict: r""" 设置一组key自增 :param dict_kv: 需要设置的k。v值 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: redis连接信息 :return: """ ret_dict = {} # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) # 执行redis查询 for k, v in dict_kv.items(): # 处理映射路径 ret_dict[k] = r.incr(name=k) return ret_dict def set_hash_vals(self, hash_key, attrs_dict=None, dict_redisinfo=None, b_is_sentinel=False): """ 为散列里面的一个或多个键设置值 :param hash_key: 散列键 :param attrs_dict: 键值对字典 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return: False / True 写入成功 """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) # hash_key不能为空 if hash_key is None or len(hash_key.strip()) == 0 \ or not attrs_dict: return False # 批量添加属性 ret = r.hmset(hash_key, attrs_dict) return ret def get_hash_vals(self, hash_key, keys_list=None, dict_redisinfo=None, b_is_sentinel=False): """ 从散列里面获取一个或多个键的值 :param hash_key: 散列键 :param keys_list: 键值对列表 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return: None / {'': ''} :return: None / {'k1': 'v1' ,'k2': 'v2'} """ ret = {} # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=False) # 散列表里存在hash_key if hash_key is None or r.hlen(hash_key) == 0: return None # 获取散列的键值结果 if keys_list: for key in keys_list: ret[key] = r.hmget(hash_key, key)[0] if r.hmget(hash_key, key) else None else: ret = r.hgetall(hash_key) return ret def delete_hash_vals(self, hash_key, key=None, dict_redisinfo=None, b_is_sentinel=False): """ 删除散列或者散列里面的一个或多个键值对 :param hash_key: 散列键 :param key: 键 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return: False / True 删除成功 """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) # 散列表里存在hash_key if hash_key is None or r.hlen(hash_key) == 0: return False # 删除 if key is not None: return True if r.hdel(hash_key, key) > 0 else False # 删除整个hash dict_kv = {hash_key: ''} ret = self.del_key(dict_kv, dict_redisinfo=dict_redisinfo) return ret def set_set_vals(self, key, val=None, dict_redisinfo=None, b_is_sentinel=False): """ 将给定元素添加到集合 :param key: key :param val: value :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return: None / 1 写入成功 """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) # 散列表里存在hash_key if key is None or val is None: return None # 添加进set ret = r.sadd(key, val) return ret def get_set_vals(self, key, dict_redisinfo=None, b_is_sentinel=False): """ 返回集合包含的元素 :param key: 键 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return:None / [a, b] """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=False) # key不为空,并判断key是否有指 if key is None or r.scard(key) == 0: return None # 返回集合包含的所有元素 ret = list(r.smembers(key)) return ret def publisher(self, channel, message=None, dict_redisinfo=None, b_is_sentinel=False): """ 向给定频道发送消息 :param channel: 频道 :param message: 消息 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return: 0 / 1 """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=True) # channel和message不能为空 if channel is None or message is None: return 0 # 发布消息 ret = r.publish(channel, message) return ret def subscribe_channel(self, channel, dict_redisinfo=None, b_is_sentinel=False): """ 订阅给定的一个或多个频道 :param channel: 频道 :param dict_redisinfo: redis连接信息 :param b_is_sentinel: 是否是哨兵,默认否,不是哨兵直接是连接信息 :return: None / pubsub对象 """ # 获取redis连接 r = self.init_redis_handle(dict_conn_info=dict_redisinfo, b_is_sentinel=b_is_sentinel, b_is_w=False) # channel为空,返回None if channel is None: return None # 订阅消息 ps = r.pubsub() ps.subscribe(channel) return ps