#!/usr/bin/env python # -*- coding: utf-8 -*- ''' @Descripttion: 接入天宫校验 @Author: guohb65 @Email: guohb65@chinaunicom.cn @Date: 2020/4/30 9:57 @LastEditors: guohb65 @LastEditTime: 2020/4/30 9:57 ''' import datetime import json import os import requests import urllib3 from flask import request, g from cucc_common_pkg.util_pkg.common_func import CommonFunc from cucc_common_pkg.util_pkg.const import RequestMethod, ConstGen, ResponseCode TG_DOMAIN = "TG_DOMAIN" CONFIG_CENTER_DOMAIN = "CONFIG_CENTER_DOMAIN" IAM_DOMAIN = "IAM_DOMAIN" ACCESS_TOKEN = "accessToken" SSO_URL = "/sso/v1/users/info" CSM_CONFIG_REGION_URL = "/configcenter/v1/product/csm/regions" CKE_CONFIG_REGION_URL = "/configcenter/v1/product/cke/regions" IAM_URL = "/iam/v1/checkuserhaspermission" IAM_OBJ_URL = "/iam/v2/checkuserhaspermissionwithinstanceattribution" IAM_LIST_URL = "/iam/v2/checkuserhaspermissionandgetinstancelist" REGION_CODE = "regionCode" IS_AUTHORIZED = "isAuthorized" PRODUCT_CODE = "productCode" ACTION_CODE = "actionCode" CRN = "crn" USER_HAS_PERMISSION = "userHasPermission" INSTANCE_LIST = "instanceList" class LoginVerify(object): def __init__(self): env_dist = os.environ common_mount_path = env_dist.get(ConstGen.COMMON_MOUNT_PATH) common_conf_file = open(common_mount_path, 'r') common_data_json = json.loads(common_conf_file.read()) common_conf_file.close() tg_domain = common_data_json.get(TG_DOMAIN) self.config_center_domain = common_data_json.get(CONFIG_CENTER_DOMAIN) self.iam_domain = common_data_json.get(IAM_DOMAIN) self.sso_url = tg_domain + SSO_URL self.cookies = request.cookies def is_login(self): urllib3.disable_warnings() start_time = None try: print(f"请求SSO开始:GET {self.sso_url}") start_time = datetime.datetime.now() resp = requests.request(RequestMethod.GET.value, self.sso_url, cookies=self.cookies, verify=False, timeout=5) end_time = datetime.datetime.now() consume_ms = (end_time - start_time).total_seconds() * 1000 print(f"请求SSO正常结束,耗时 {consume_ms} 豪秒:GET {self.sso_url}") except Exception as e: if start_time: end_time = datetime.datetime.now() consume_ms = (end_time - start_time).total_seconds() * 1000 print(f"请求SSO异常结束,耗时 {consume_ms} 豪秒:e={e},GET {self.sso_url}") else: print(f"请求SSO异常结束:e={e},GET {self.sso_url}") return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value, "SSO请求异常") resp_code = resp.status_code if resp_code == 200: if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE) and resp.text is not None: ret_data = json.loads(resp.text) if ResponseCode.OK.value == ret_data.get(ConstGen.CODE): g.user_info = ret_data.get(ConstGen.DATA_STR) g.cookies = self.cookies print("SSO返回用户信息:" + g.user_info.__str__()) return True, None return False, CommonFunc().user_info_error() def config_center_verify(self, region_id): config_center_url = self.config_center_domain + CSM_CONFIG_REGION_URL + "?" \ + REGION_CODE + "=" + region_id + "&" + IS_AUTHORIZED + "=" + "true" urllib3.disable_warnings() try: resp = requests.request(RequestMethod.GET.value, config_center_url, cookies=self.cookies, verify=False) except Exception as e: print("配置中心请求异常:" + e.__str__()) return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value, "配置中心请求异常") resp_code = resp.status_code if resp_code == 200: if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE) and resp.text is not None: ret_data = json.loads(resp.text) print(ret_data) if ResponseCode.OK.value == ret_data.get(ConstGen.CODE): data = ret_data.get(ConstGen.DATA_STR) if len(data) == 0: return False else: return True print("配置中心请求返回:" + resp.text) return False def config_center_region_list(self): config_center_url = self.config_center_domain + CSM_CONFIG_REGION_URL + "?" + IS_AUTHORIZED + "=" + "true" urllib3.disable_warnings() try: resp = requests.request(RequestMethod.GET.value, config_center_url, cookies=self.cookies, verify=False) print("配置中心请求返回:" + resp.text) except Exception as e: print("配置中心请求异常:" + e.__str__()) return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value, "配置中心请求异常") resp_code = resp.status_code if resp_code == 200: return resp.text else: return False, CommonFunc().fabricate_response_data(ConstGen.SUCCESS_CODE, ResponseCode.Failure.value, "配置中心请求异常") def iam_verify(self, region_id, action_code, instance_name=None): account_id = g.user_info.get(ConstGen.ACCOUNT_ID_STR) crn_pattern = "crn:ucs:*:csm:" + region_id + ":" + account_id + ":" if instance_name is None: crn_pattern += "*" else: crn_pattern += instance_name iam_request_url = self.iam_domain + IAM_OBJ_URL + "?" \ + PRODUCT_CODE + "=csm&" + ACTION_CODE + "=" + action_code + "&" + CRN + "=" + crn_pattern urllib3.disable_warnings() try: resp = requests.request(RequestMethod.GET.value, iam_request_url, cookies=self.cookies, verify=False) except Exception as e: print("IAM请求异常:" + e.__str__()) return False resp_code = resp.status_code if resp_code == 200: if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE): ret_data = json.loads(resp.text) if ResponseCode.OK.value == ret_data.get(ConstGen.CODE): return ret_data[ConstGen.DATA_STR].get(USER_HAS_PERMISSION) print("IAM请求返回:" + resp.text) return False def iam_filter(self, region_id, action_code, filter_key, instances: list): account_id = g.user_info.get(ConstGen.ACCOUNT_ID_STR) iam_request_url = self.iam_domain + IAM_LIST_URL urllib3.disable_warnings() body_data = {} body_data[PRODUCT_CODE] = "csm" body_data[ACTION_CODE] = action_code data = {} crn_list = [] for instance in instances: crn_data = {} crn_pattern = "crn:ucs:*:csm:" + region_id + ":" + account_id + ":" + instance[filter_key] crn_data[CRN] = crn_pattern if crn_pattern in data: data[crn_pattern].append(instance) else: data[crn_pattern] = [instance] crn_list.append(crn_data) body_data[INSTANCE_LIST] = crn_list try: resp = requests.request(RequestMethod.POST.value, iam_request_url, json=body_data, cookies=self.cookies, verify=False) except Exception as e: print("IAM请求异常:" + e.__str__()) return False, "IAM请求异常" resp_code = resp.status_code if resp_code == 200: if ConstGen.CONTENT_TYPE_JSON in resp.headers.get(ConstGen.CONTENT_TYPE): ret_data = json.loads(resp.text) if ResponseCode.OK.value == ret_data.get(ConstGen.CODE) and ret_data[ConstGen.DATA_STR].get( USER_HAS_PERMISSION) is True: result_data = ret_data[ConstGen.DATA_STR].get("instanceList") last_list = [] for data_i in result_data: if data_i[CRN] in data: last_list.extend(data[data_i[CRN]]) data.pop(data_i[CRN]) return True, last_list else: return False, "无权限" return True, []