#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import os from .globalconst import RedisConnConst, RedisSentinelConnConst, RedisSentinelConnEnvConst, RedisConnEnvConst, \ RedisInfoKeysConst class CfgInfoHandle: def __init__(self, flask_app=None): r""" 登录处理初始化 Args: flask_app: flask app list_filter_url: url过滤,不需要被登录检查的 list[dict] """ # 初始化,处理是否接入flask self.app = flask_app if flask_app is not None: self.init_app(flask_app) @classmethod def init_app(self, app): r""" 初始化flask变量,走所有服务之前去验证登录 Args: app: Returns: """ pass def get_redis_info(self, str_master_ip_port: str = None, str_slave_ip_port: str = None, str_redis_auth: str = None, str_redis_db: str = None) -> dict: r""" 获取redis环境信息 Args: str_master_ip_port: redis 主节点信息 str_slave_ip_port: redis 从节点信息 str_redis_auth: redis 密码 str_redis_db: redis 库index Returns: 返回redis连接信息 """ env_dist = os.environ # environ是在os.py中定义的一个dict environ = {} if str_master_ip_port is not None: master_ip = str_master_ip_port.split(':')[0] master_port = str_master_ip_port.split(':')[1] else: master_ip = env_dist.get(RedisConnEnvConst.CONN_MR_IP) if env_dist.get( RedisConnEnvConst.CONN_MR_IP) else "csm-redis-master" master_port = env_dist.get(RedisConnEnvConst.CONN_MR_PORT) if env_dist.get( RedisConnEnvConst.CONN_MR_PORT) else "6379" if str_slave_ip_port is not None: slave_ip = str_slave_ip_port.split(':')[0] slave_port = str_slave_ip_port.split(':')[1] else: slave_ip = env_dist.get(RedisConnEnvConst.CONN_SL_IP) if env_dist.get( RedisConnEnvConst.CONN_SL_IP) else "csm-redis-slave" slave_port = env_dist.get(RedisConnEnvConst.CONN_SL_PORT) if env_dist.get( RedisConnEnvConst.CONN_SL_PORT) else "6379" if str_redis_auth is not None: redis_auth = str_redis_auth else: redis_auth = env_dist.get(RedisConnEnvConst.CONN_MR_AUTH) if env_dist.get( RedisConnEnvConst.CONN_MR_AUTH) else "!@#rjyjy" if str_redis_db is not None: redis_index = str_redis_db else: redis_index = env_dist.get(RedisConnEnvConst.CONN_MR_DB) if env_dist.get( RedisConnEnvConst.CONN_MR_DB) else "9" master_dict_conn = { RedisConnConst.CONN_IP: master_ip , RedisConnConst.CONN_PORT: master_port , RedisConnConst.CONN_AUTH: redis_auth , RedisConnConst.CONN_DB: redis_index } slave_dict_conn = { RedisConnConst.CONN_IP: slave_ip , RedisConnConst.CONN_PORT: slave_port , RedisConnConst.CONN_AUTH: redis_auth , RedisConnConst.CONN_DB: redis_index } dict_conn = { RedisInfoKeysConst.REDIS_MASTER_KEY: master_dict_conn , RedisInfoKeysConst.REDIS_SLAVE_KEY: slave_dict_conn } return dict_conn def get_redis_sentinel_info(self, list_sentinels: list = None, str_clustername: str = None, str_redis_auth: str = None, str_redis_db: str = None) -> dict: r""" 获取redis的sentinel信息 Args: list_sentinels: 哨兵集群,内部是tuple str_clustername: 哨兵集群名称 str_redis_auth: redis密码 str_redis_db: 当前操作的redis库 Returns: """ env_dist = os.environ # environ是在os.py中定义的一个dict environ = {} if list_sentinels is not None: sentinel_cluster_hosts = list_sentinels else: sentinel_cluster_hosts = env_dist.get(RedisSentinelConnEnvConst.CONN_HOSTS) if env_dist.get( RedisSentinelConnEnvConst.CONN_HOSTS) else [('redis-sentinel', '26379')] if str_clustername is not None: sentinel_cluster_name = str_clustername else: sentinel_cluster_name = env_dist.get(RedisSentinelConnEnvConst.CONN_CLUSTERNAME) if env_dist.get( RedisSentinelConnEnvConst.CONN_CLUSTERNAME) else "csmredis" if str_redis_auth is not None: redis_auth = str_redis_auth else: redis_auth = env_dist.get(RedisSentinelConnEnvConst.CONN_AUTH) if env_dist.get( RedisSentinelConnEnvConst.CONN_AUTH) else "!@#rjyjy" if str_redis_db is not None: redis_index = str_redis_db else: redis_index = env_dist.get(RedisSentinelConnEnvConst.CONN_DB) if env_dist.get( RedisSentinelConnEnvConst.CONN_DB) else "9" dict_conn = { RedisSentinelConnConst.CONN_HOSTS: sentinel_cluster_hosts , RedisSentinelConnConst.CONN_CLUSTERNAME: sentinel_cluster_name , RedisConnConst.CONN_AUTH: redis_auth , RedisConnConst.CONN_DB: redis_index } return dict_conn