#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import copy import json import uuid from functools import wraps from flask import current_app, request from .cfginfohandle import CfgInfoHandle from .globalconst import CookiesConstKey, FlaskConfigIDConst, RedisConnConst, RedisDbNumConst, GlobalConst, \ GlobalRetKeyConst, RedisInfoKeysConst from .globalerror import CommonError from .my_baseexception import MyBaseException from .my_servicehandle import ServiceHandle from .redishanlde import RedisHandle class LoginHandle: # 过滤规则 FILTER_RULE_NAME = "filter_name" # 过滤具体值 FILTER_RULE_VALUE = "filter_val" # 精确匹配 FILTER_EXACT = "filter_exact" # 正则匹配 FILTER_REG = "filter_regular" def __init__(self, flask_app=None, list_filter_url: list = None): r""" 登录处理初始化 Args: flask_app: flask app list_filter_url: url过滤,不需要被登录检查的 list[dict] """ # 初始化,处理是否接入flask self.list_filter_url = None if list_filter_url is not None: self.list_filter_url = list_filter_url self.app = flask_app if flask_app is not None: self.init_app(flask_app) def init_app(self, app): r""" 初始化flask变量,走所有服务之前去验证登录 Args: app: Returns: """ # app.before_request(self.checklogin) pass def login(self, login_name: str = None, login_pass: str = None, login_code: str = None, tenant_id: str = None, proc_name: str = "auth_userlogin_proc", login_srv_url: str = None, redis_info: dict = None) -> tuple: R""" Args: login_name: 登录账号 login_pass: 登录密码 login_code: 登录验证码 tenant_id: 租户ID proc_name: proc名称 login_srv_url: 登录url redis_info: redis信息 Returns: """ # 登录验证 service_url = login_srv_url trans_data = { "arg_tenantid": tenant_id , "arg_username": login_name , "arg_userpass": login_pass } sh = ServiceHandle() ret_json: dict = sh.call_proc(service_url=service_url, proc_name=proc_name, arg_dict=trans_data) # 返回失败 if sh.check_is_success(ret_json) is False or GlobalRetKeyConst.DATAROWS not in ret_json or len( ret_json[GlobalRetKeyConst.DATAROWS]) == 0: return ret_json, None user_info: dict = ret_json[GlobalRetKeyConst.DATAROWS][0] jsessionid = str(uuid.uuid1()).replace('-', '') dict_kv = { jsessionid: json.dumps(user_info, ensure_ascii=False) } # 校验成功写入缓存 rh: RedisHandle = current_app.config[FlaskConfigIDConst.REDIS_CLASS_ID] rh.set_vals(dict_redisinfo=redis_info, dict_kv=dict_kv) return ret_json, jsessionid def check_login(func): r""" 装饰器检测是否登录 Returns: """ @wraps(func) def wrapper(*args, **kwargs): if_login = False # 检查cookies是否为null 或者 cookies中的sessionid的值是否在redis中存在 if request.cookies.keys() is not None and CookiesConstKey.LOGIN_SESSTION_KEY in request.cookies.keys(): if_login = True rh: RedisHandle = current_app.config[FlaskConfigIDConst.REDIS_CLASS_ID] cfg_handle: CfgInfoHandle = current_app.config[FlaskConfigIDConst.INIT_CONFIG_ID] dict_conn: dict = copy.copy(cfg_handle.get_redis_info()[RedisInfoKeysConst.REDIS_SLAVE_KEY]) dict_conn[RedisConnConst.CONN_DB] = RedisDbNumConst.LOGIN_INFO csm_id = request.cookies[CookiesConstKey.LOGIN_SESSTION_KEY] dict_key = { csm_id: csm_id } ret_dict = rh.get_vals(dict_key, dict_conn) if csm_id not in ret_dict: if_login = False if if_login is True: return func(*args, **kwargs) else: return json.dumps(MyBaseException.format_to_standard_dict(ret_code=GlobalConst.RETCODE_COMMON_ERROR, ret_val=CommonError.ERROR_USER_UNLOGIN ), ensure_ascii=False).encode('utf-8') return wrapper