login_verify.py 8.77 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed

#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
@Descripttion: 接入天宫校验
@Author: guohb65
@Email: guohb65@chinaunicom.cn
@Date: 2020/4/30 9:57
@LastEditors: guohb65
@LastEditTime: 2020/4/30 9:57
'''
import datetime
import json
import os

import requests
import urllib3
from flask import request, g

from cucc_common_pkg.util_pkg.common_func import CommonFunc
from cucc_common_pkg.util_pkg.const import RequestMethod, ConstGen, ResponseCode

TG_DOMAIN = "TG_DOMAIN"
CONFIG_CENTER_DOMAIN = "CONFIG_CENTER_DOMAIN"
IAM_DOMAIN = "IAM_DOMAIN"
ACCESS_TOKEN = "accessToken"
SSO_URL = "/sso/v1/users/info"
CSM_CONFIG_REGION_URL = "/configcenter/v1/product/csm/regions"
CKE_CONFIG_REGION_URL = "/configcenter/v1/product/cke/regions"
IAM_URL = "/iam/v1/checkuserhaspermission"
IAM_OBJ_URL = "/iam/v2/checkuserhaspermissionwithinstanceattribution"
IAM_LIST_URL = "/iam/v2/checkuserhaspermissionandgetinstancelist"
REGION_CODE = "regionCode"
IS_AUTHORIZED = "isAuthorized"
PRODUCT_CODE = "productCode"
ACTION_CODE = "actionCode"
CRN = "crn"
USER_HAS_PERMISSION = "userHasPermission"

INSTANCE_LIST = "instanceList"


class LoginVerify(object):

    def __init__(self):
        env_dist = os.environ
        common_mount_path = env_dist.get(ConstGen.COMMON_MOUNT_PATH)
        common_conf_file = open(common_mount_path, 'r')
        common_data_json = json.loads(common_conf_file.read())
        common_conf_file.close()
        tg_domain = common_data_json.get(TG_DOMAIN)
        self.config_center_domain = common_data_json.get(CONFIG_CENTER_DOMAIN)
        self.iam_domain = common_data_json.get(IAM_DOMAIN)
        self.sso_url = tg_domain + SSO_URL
        self.cookies = request.cookies

    def is_login(self):
        urllib3.disable_warnings()
        start_time = None
        try:
            print(f"请求SSO开始:GET {self.sso_url}")
            start_time = datetime.datetime.now()
            resp = requests.request(RequestMethod.GET.value, self.sso_url,
                                    cookies=self.cookies, verify=False, timeout=5)
            end_time = datetime.datetime.now()
            consume_ms = (end_time - start_time).total_seconds() * 1000
            print(f"请求SSO正常结束,耗时 {consume_ms} 豪秒:GET {self.sso_url}")
        except Exception as e:
            if start_time:
                end_time = datetime.datetime.now()
                consume_ms = (end_time - start_time).total_seconds() * 1000
                print(f"请求SSO异常结束,耗时 {consume_ms} 豪秒:e={e},GET {self.sso_url}")
            else:
                print(f"请求SSO异常结束:e={e},GET {self.sso_url}")
            return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value,
                                                               "SSO请求异常")

        resp_code = resp.status_code
        if resp_code == 200:
            if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE) and resp.text is not None:
                ret_data = json.loads(resp.text)
                if ResponseCode.OK.value == ret_data.get(ConstGen.CODE):
                    g.user_info = ret_data.get(ConstGen.DATA_STR)
                    g.cookies = self.cookies
                    print("SSO返回用户信息:" + g.user_info.__str__())
                    return True, None
        return False, CommonFunc().user_info_error()

    def config_center_verify(self, region_id):
        config_center_url = self.config_center_domain + CSM_CONFIG_REGION_URL + "?" \
                            + REGION_CODE + "=" + region_id + "&" + IS_AUTHORIZED + "=" + "true"
        urllib3.disable_warnings()
        try:
            resp = requests.request(RequestMethod.GET.value, config_center_url,
                                    cookies=self.cookies, verify=False)
        except Exception as e:
            print("配置中心请求异常:" + e.__str__())
            return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value,
                                                               "配置中心请求异常")

        resp_code = resp.status_code
        if resp_code == 200:
            if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE) and resp.text is not None:
                ret_data = json.loads(resp.text)
                print(ret_data)
                if ResponseCode.OK.value == ret_data.get(ConstGen.CODE):
                    data = ret_data.get(ConstGen.DATA_STR)
                    if len(data) == 0:
                        return False
                    else:
                        return True
        print("配置中心请求返回:" + resp.text)
        return False

    def config_center_region_list(self):
        config_center_url = self.config_center_domain + CSM_CONFIG_REGION_URL + "?" + IS_AUTHORIZED + "=" + "true"
        urllib3.disable_warnings()
        try:
            resp = requests.request(RequestMethod.GET.value, config_center_url,
                                    cookies=self.cookies, verify=False)
            print("配置中心请求返回:" + resp.text)
        except Exception as e:
            print("配置中心请求异常:" + e.__str__())
            return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value,
                                                               "配置中心请求异常")
        resp_code = resp.status_code
        if resp_code == 200:
            return resp.text
        else:
            return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value,
                                                               "配置中心请求异常")

    def iam_verify(self, region_id, action_code, instance_name=None):
        account_id = g.user_info.get(ConstGen.ACCOUNT_ID_STR)
        crn_pattern = "crn:ucs:*:csm:" + region_id + ":" + account_id + ":"
        if instance_name is None:
            crn_pattern += "*"
        else:
            crn_pattern += instance_name
        iam_request_url = self.iam_domain + IAM_OBJ_URL + "?" \
                          + PRODUCT_CODE + "=csm&" + ACTION_CODE + "=" + action_code + "&" + CRN + "=" + crn_pattern
        urllib3.disable_warnings()
        try:
            resp = requests.request(RequestMethod.GET.value, iam_request_url,
                                    cookies=self.cookies, verify=False)
        except Exception as e:
            print("IAM请求异常:" + e.__str__())
            return False
        resp_code = resp.status_code
        if resp_code == 200:
            if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE):
                ret_data = json.loads(resp.text)
                if ResponseCode.OK.value == ret_data.get(ConstGen.CODE):
                    return ret_data[ConstGen.DATA_STR].get(USER_HAS_PERMISSION)
        print("IAM请求返回:" + resp.text)
        return False

    def iam_filter(self, region_id, action_code, filter_key, instances: list):
        account_id = g.user_info.get(ConstGen.ACCOUNT_ID_STR)
        iam_request_url = self.iam_domain + IAM_LIST_URL
        urllib3.disable_warnings()
        body_data = {}
        body_data[PRODUCT_CODE] = "csm"
        body_data[ACTION_CODE] = action_code
        data = {}
        crn_list = []
        for instance in instances:
            crn_data = {}
            crn_pattern = "crn:ucs:*:csm:" + region_id + ":" + account_id + ":" + instance[filter_key]
            crn_data[CRN] = crn_pattern
            if crn_pattern in data:
                data[crn_pattern].append(instance)
            else:
                data[crn_pattern] = [instance]
            crn_list.append(crn_data)
        body_data[INSTANCE_LIST] = crn_list
        try:
            resp = requests.request(RequestMethod.POST.value, iam_request_url, json=body_data,
                                    cookies=self.cookies, verify=False)
        except Exception as e:
            print("IAM请求异常:" + e.__str__())
            return False, "IAM请求异常"
        resp_code = resp.status_code
        if resp_code == 200:
            if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE):
                ret_data = json.loads(resp.text)
                if ResponseCode.OK.value == ret_data.get(ConstGen.CODE) and ret_data[ConstGen.DATA_STR].get(
                        USER_HAS_PERMISSION) is True:
                    result_data = ret_data[ConstGen.DATA_STR].get("instanceList")
                    last_list = []
                    for data_i in result_data:
                        if data_i[CRN] in data:
                            last_list.extend(data[data_i[CRN]])
                            data.pop(data_i[CRN])
                    return True, last_list
                else:
                    return False, "无权限"
        return True, []