#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import random from typing import List, Dict import redis from redis import StrictRedis from redis.sentinel import Sentinel from .common_app_config import CommonAppConfig from .utils_base import UtilityBaseV2 class BaseRedisService: r""" redis操作基类 """ # 当调用jedis API时发生错误后重试的次数, 默认为1 retry_times_when_exception: int = 1 app_utils: UtilityBaseV2 = None need_decode_responses: bool = True r""" 获取到redis函数结果后是否需要解码 如果conn的初始化中包含 decode_responses=True,则应该设置此属性为False, 即获取到的redis函数结果已经是解码后的数据了,不需要再次解码 """ encoding_used_for_decode_response: str = "utf-8" def __init__(self, *args, **kwargs): r""" 初始化 :key retry_times_when_exception 当操作发生异常时的重试次数,默认为 1 :key app_utils: 使用的app_utils便捷类,默认自动生成 :key logger: 用于打印日志 """ retry_times_when_exception = kwargs.get("retry_times_when_exception") app_utils = kwargs.get("app_utils") self.retry_times_when_exception = retry_times_when_exception if retry_times_when_exception > 0 else 1 self.app_utils = app_utils if app_utils else UtilityBaseV2( logger=kwargs.get("logger") or CommonAppConfig().common_logger, ) self.logger = kwargs.get("logger") or CommonAppConfig().common_logger def set(self, name: str, value: str, ex: int = None, px: int = None, nx: bool = False, xx: bool = False) -> bool: r""" Set the value at key ``name`` to ``value`` , 返回 bool 表示是否成功执行 :param name :param value :param ex sets an expire flag on key ``name`` for ``ex`` seconds. :param px sets an expire flag on key ``name`` for ``px`` milliseconds. :param nx if set to True, set the value at key ``name`` to ``value`` only if it does not exist. :param xx if set to True, set the value at key ``name`` to ``value`` only if it already exists. """ def operation_func(): return self.get_master_conn().set(name=name, value=value, ex=ex, px=px, nx=nx, xx=xx) return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def get(self, name: str) -> str: r""" Return the value at key ``name``, or None if the key doesn't exist,返回 utf-8解码的字符串 :param name :return """ def operation_func(): result = self.get_slave_conn().get(name=name) if self.need_decode_responses and result is not None: result = str(result, self.encoding_used_for_decode_response) return result return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def mset(self, name_value_dict: dict) -> bool: r""" Sets key/values based on a mapping. Mapping is a dictionary of key/value pairs. Both keys and values should be strings or types that can be cast to a string via str() , 返回 bool 表示是否成功执行 :param name_value_dict :return """ def operation_func(): return self.get_master_conn().mset(mapping=name_value_dict) return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def mget(self, *to_get_names: str) -> list: r""" Returns a list of values ordered identically to ``keys``, 返回 utf-8解码的字符串list :param to_get_names :return """ def operation_func(): result = self.get_slave_conn().mget(keys=to_get_names) if self.need_decode_responses and result is not None: result = [ str(tmp_byte_value, self.encoding_used_for_decode_response) if tmp_byte_value is not None else None for tmp_byte_value in result] return result return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def hset(self, name: str, key: str, value: str) -> int: r""" Set ``key`` to ``value`` within hash ``name`` Returns 1 if HSET created a new field, otherwise 0, 返回 int 表示是否成功插入的key的数量 :param name :param key :param value """ def operation_func(): return self.get_master_conn().hset(name=name, key=key, value=value) return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def hget(self, name: str, key: str) -> str: r""" Returns a list of values ordered identically to ``keys``,返回 utf-8解码的字符串list :param name :param key """ def operation_func(): result = self.get_slave_conn().hget(name=name, key=key) if self.need_decode_responses and result is not None: result = str(result, self.encoding_used_for_decode_response) return result return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def hmset(self, name: str, key_value_dict: dict) -> bool: r""" Set key to value within hash ``name`` for each corresponding key and value from the ``mapping`` dict. , 返回 bool 表示是否成功执行 :param name :param key_value_dict """ def operation_func(): return self.get_master_conn().hmset(name=name, mapping=key_value_dict) return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def hmget(self, *to_get_keys: str, name: str) -> list: r""" Returns a list of values ordered identically to ``keys``, 返回 utf-8解码的字符串list :param to_get_keys :param name """ def operation_func(): result = self.get_slave_conn().hmget(name=name, keys=to_get_keys) if self.need_decode_responses and result is not None: result = [ str(tmp_byte_value, self.encoding_used_for_decode_response) if tmp_byte_value is not None else None for tmp_byte_value in result] return result return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def hgetall(self, name: str) -> dict: r""" Return a Python dict of the hash's name/value pairs , 返回 utf-8解码的key,value字符串 的 dict :param name """ def operation_func(): result = self.get_slave_conn().hgetall(name=name) if self.need_decode_responses and result is not None: result = {str(tmp_byte_key, self.encoding_used_for_decode_response): str(result[tmp_byte_key], self.encoding_used_for_decode_response) for tmp_byte_key in result.keys()} return result return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def expire(self, name: str, time: int, use_second: bool = False, use_millisecond: bool = False) -> bool: r""" 设置key的过期时间, 从现在开始算过多少秒或毫秒之后key会被删除 如果 use_second 为 True ,则 time 单位为秒的int或者 python timedelta object. 如果 use_millisecond 为 True , 则 time 单位为毫秒的int或者 python timedelta object. use_second 和 use_millisecond 必须至少有一个为True 返回 bool 表示是否成功执行 :param name :param time :param use_millisecond :param use_second """ def operation_func(): if use_second: return self.get_master_conn().expire(name=name, time=time) elif use_millisecond: return self.get_master_conn().pexpire(name=name, time=time) else: raise RuntimeError('use_second 和 use_millisecond 必须至少有一个为True') return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def expireat(self, name: str, when: int, use_second: bool = False, use_millisecond: bool = False) -> bool: r""" 设置key的过期时间, 在指定的时间点key会被删除 如果 use_second 为 True ,则 when 为 unix time or a Python datetime object 如果 use_millisecond 为 True , 则 when 为 unix time in milliseconds (unix time * 1000) or a Python datetime object. use_second 和 use_millisecond 必须至少有一个为True 返回 bool 表示是否成功执行 :param name :param when :param use_second :param use_millisecond """ def operation_func(): if use_second: return self.get_master_conn().expireat(name=name, when=when) elif use_millisecond: return self.get_master_conn().pexpireat(name=name, when=when) else: raise RuntimeError('use_second 和 use_millisecond 必须至少有一个为True') return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def delete(self, *to_delete_names: str) -> int: r""" Delete one or more keys specified by ``names`` 返回 int 表示是否成功删除的key的数量 :param to_delete_names """ def operation_func(): return self.get_master_conn().delete(*to_delete_names) return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def keys(self, pattern: str) -> list: r""" Returns a list of keys matching ``pattern``, 返回 utf-8解码的字符串list :param pattern """ def operation_func(): result = self.get_slave_conn().keys(pattern=pattern) if self.need_decode_responses and result is not None: result = [str(tmp_byte_value, self.encoding_used_for_decode_response) for tmp_byte_value in result] return result return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def delete_by_pattern(self, pattern: str) -> int: r""" 按 pattern 删除key, 返回 int 表示是否成功删除的key的数量 :param pattern """ def operation_func(): key_list = self.get_slave_conn().keys(pattern=pattern) if key_list: return self.get_master_conn().delete(*key_list) else: return 0 return self.app_utils.common_operation(retry_times_when_exception=self.retry_times_when_exception, operation_func=operation_func, on_operation_failed=self.on_operation_failed, on_operation_completed=self.on_operation_completed) def on_operation_failed(self): r""" 当redis操作失败时的动作,一般是重建连接,或者等待几秒之类的操作 :return: """ pass def on_operation_completed(self): r""" 当redis操作完成时的动作,一般是什么都不做 :return: """ pass def get_master_conn(self) -> StrictRedis: r""" 获取master连接 :return: """ raise RuntimeError("Not implemented") def get_slave_conn(self) -> StrictRedis: r""" 获取slave连接 :return: """ raise RuntimeError("Not implemented") class RedisServiceMasterAndSlave(BaseRedisService): r""" master redis和slave redis分别设置的redis帮助服务 """ master_redis_config_dict_list: List[Dict] = None slave_redis_config_dict_list: List[Dict] = None master_redis_conn_list: List[StrictRedis] = None slave_redis_conn_list: List[StrictRedis] = None def __init__(self, *args, **kwargs): r""" 初始化 :key retry_times_when_exception 当操作发生异常时的重试次数,默认为 1 :key app_utils: 使用的app_utils便捷类,默认自动生成 :key master_redis_config_dict_list master redis连接的dict列表 :key slave_redis_config_dict_list slave redis连接的dict列表 字典的key和value组成参考 redis.Redis()的init函数,大致内容有 host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None """ super().__init__(*args, **kwargs) self.master_redis_config_dict_list = kwargs.get("master_redis_config_dict_list") self.slave_redis_config_dict_list = kwargs.get("slave_redis_config_dict_list") self.need_decode_responses = False if self.master_redis_config_dict_list[0].get("decode_responses") else True encoding_used_for_decode_response = self.master_redis_config_dict_list[0].get("encoding") self.encoding_used_for_decode_response = encoding_used_for_decode_response if encoding_used_for_decode_response else 'utf-8' self._init_redis_conn() def get_master_conn(self): return self.master_redis_conn_list[random.choice(range(len(self.master_redis_conn_list)))] def get_slave_conn(self): return self.slave_redis_conn_list[random.choice(range(len(self.slave_redis_conn_list)))] def on_operation_failed(self): self._init_redis_conn() def _init_redis_conn(self): self.master_redis_conn_list = [redis.StrictRedis(**tmp_dict) for tmp_dict in self.master_redis_config_dict_list] self.slave_redis_conn_list = [redis.StrictRedis(**tmp_dict) for tmp_dict in self.slave_redis_config_dict_list] class RedisServiceSentinel(BaseRedisService): r""" 哨兵版redis集群的redis帮助服务 """ redis_sentinel_config_dict: dict = None sentinel: Sentinel = None service_name: str = None def __init__(self, *args, **kwargs): r""" 初始化 :key retry_times_when_exception 当操作发生异常时的重试次数,默认为 1 :key app_utils: 使用的app_utils便捷类,默认自动生成 :key service_name: 通过哨兵查询redis server使用的服务名 :key redis_sentinel_config_dict redis哨兵连接的dict配置信息 字典的key和value组成参考 redis.Sentinel 的init函数,大致内容有 sentinels 一组节点,每个节点使用 (hostname, port) 表示 min_other_sentinels 为一个哨兵定义一个peers的最小节点数,当查询一个哨兵时,如果没有达到这个阈值,其响应不被认为是有效的,默认为0 sentinel_kwargs 是一个字典,用于连接到哨兵时使用, 和 redis.Redis 的init函数中需要的参数一样, 如果没有提供该值,则使用 connection_kwargs 中以 socket_ 开头的参数作为 sentinel_kwargs connection_kwargs 中的参数用于创建 redis server时使用 基本上 sentinel_kwargs 只需要 password=None, socket_timeout=None, connection_kwargs 中需要 db=0, password=None, socket_timeout=None, 其中 sentinel_kwargs 和 connection_kwargs 可用的参数有 db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None """ super().__init__(*args, **kwargs) self.service_name = kwargs.get("service_name") self.redis_sentinel_config_dict = kwargs.get("redis_sentinel_config_dict") sentinels = self.redis_sentinel_config_dict.get("sentinels") min_other_sentinels = self.redis_sentinel_config_dict.get("min_other_sentinels") sentinel_kwargs = self.redis_sentinel_config_dict.get("sentinel_kwargs") connection_kwargs = self.redis_sentinel_config_dict.get("connection_kwargs") self.need_decode_responses = False if connection_kwargs and connection_kwargs.get("decode_responses") else True encoding_used_for_decode_response = connection_kwargs.get("encoding") if connection_kwargs else None self.encoding_used_for_decode_response = encoding_used_for_decode_response if encoding_used_for_decode_response else 'utf-8' if connection_kwargs: self.sentinel = Sentinel(sentinels=sentinels, min_other_sentinels=min_other_sentinels, sentinel_kwargs=sentinel_kwargs, **connection_kwargs) else: self.sentinel = Sentinel(sentinels=sentinels, min_other_sentinels=min_other_sentinels, sentinel_kwargs=sentinel_kwargs) def get_master_conn(self): return self.sentinel.master_for(service_name=self.service_name) def get_slave_conn(self): return self.sentinel.slave_for(service_name=self.service_name) def on_operation_failed(self): pass class RedisServiceCluster(BaseRedisService): r""" 集群版redis集群的redis帮助服务 """ redis_server_config_dict_list: List[Dict] = None redis_server_conn_list: List[StrictRedis] = None def __init__(self, *args, **kwargs): r""" 初始化 :key retry_times_when_exception 当操作发生异常时的重试次数,默认为 1 :key app_utils: 使用的app_utils便捷类,默认自动生成 :key redis_server_config_dict_list redis连接的dict列表 字典的key和value组成参考 redis.Redis()的init函数,大致内容有 host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs='required', ssl_ca_certs=None, ssl_check_hostname=False, max_connections=None, single_connection_client=False, health_check_interval=0, client_name=None, username=None """ super().__init__(*args, **kwargs) self.redis_server_config_dict_list = kwargs.get("redis_server_config_dict_list") self.need_decode_responses = False if self.redis_server_config_dict_list[0].get("decode_responses") else True encoding_used_for_decode_response = self.redis_server_config_dict_list[0].get("encoding") self.encoding_used_for_decode_response = encoding_used_for_decode_response if encoding_used_for_decode_response else 'utf-8' self._init_redis_conn() def get_master_conn(self): return self.redis_server_conn_list[random.choice(range(len(self.redis_server_conn_list)))] def get_slave_conn(self): return self.redis_server_conn_list[random.choice(range(len(self.redis_server_conn_list)))] def on_operation_failed(self): self._init_redis_conn() def _init_redis_conn(self): self.redis_server_conn_list = [redis.StrictRedis(**tmp_dict) for tmp_dict in self.redis_server_config_dict_list]