''' @Description: 通用函数 @Author: guohb65 @Email: guohb65@chinaunicom.cn @Date: 2019-12-10 10:58:53 @LastEditTime : 2019-12-20 14:51:44 @LastEditors : guohb65 ''' import datetime import json import os import pytz import requests import urllib3 from flask import make_response from cucc_common_pkg import httptrans from .const import ConstGen, ResponseCode, K8SOptJsonStr, K8SRestStr CSM_BS_CODE = 'csm-bs-code' CONTENT_TYPE = "Content-Type" CONTENT_TYPE_JSON = "application/json" RET_DATA = "data" ACCESS_MSG = "accessMsg" MESSAGE = "message" CODE = "code" CA_FLAG = "ca_flag" HEADERS = "headers" DATA_STR = "data" PARAMS_STR = "params" TZ = "Asia/Shanghai" class CommonFunc(object): def params_verification(self, arg_params, verify_key_list): """ 参数校验 :param arg_params: :param verify_key_list: :return: """ if arg_params is None: return False, self.fabricate_response_data(ConstGen.FAIL_CODE, ResponseCode.MissingParameter.value, "参数不存在") for key in verify_key_list: if key not in arg_params: return False, self.fabricate_response_data(ConstGen.FAIL_CODE, ResponseCode.MissingParameter.value, "参数不存在") return True, None @staticmethod def gen_resp_data(code, msg, data_list=None): """ 构造返回数据格式 :param code: :param msg: :param data_list: :return: """ data = {ConstGen.CODE: code, ConstGen.MESSAGE: msg} if data_list is not None: data[ConstGen.DATA_STR] = data_list return data def return_data(self, ret_data): """ 返回数据处理(针对旧版数据格式) :param ret_data: :return: """ if ConstGen.RET_CODE in ret_data: csm_bs_code = ret_data[ConstGen.RET_CODE] else: csm_bs_code = ConstGen.FAIL_CODE if ConstGen.RET_VAL in ret_data: ret_message = ret_data[ConstGen.RET_VAL] else: ret_message = ret_data if csm_bs_code is ConstGen.SUCCESS_CODE: ret_code = ResponseCode.OK.value else: ret_code = ResponseCode.Failure.value if "1" == ret_message: ret_message = ConstGen.SUCCESS_STR if ConstGen.DATA_ROWS_STR in ret_data: data_row_list = ret_data[ConstGen.DATA_ROWS_STR] else: data_row_list = [] return self.fabricate_response_data(csm_bs_code, ret_code, ret_message, None, data_row_list) def return_data_list(self, ret_data): """ 返回数据列表 :param ret_data: :return: """ if ConstGen.DATA_ROWS_STR in ret_data: data_row_list = ret_data[ConstGen.DATA_ROWS_STR] else: data_row_list = [] return data_row_list def user_info_error(self): ''' 返回用户信息错误 :return: ''' return self.fabricate_response_data(ConstGen.FAIL_CODE, ResponseCode.AuthFailure.value, "用户信息不存在", "用户未登录") @staticmethod def return_data_direct(ret_data): response = make_response(ret_data) response.headers[CONTENT_TYPE] = CONTENT_TYPE_JSON return response @staticmethod def fabricate_response_data(csm_bs_code, ret_code, ret_message, ret_access_msg=None, data_row_list=None): ''' 构造响应数据包 :param csm_bs_code: :param ret_code: :param ret_message: :param ret_access_msg: :param data_row_list: :return: ''' data = {CODE: ret_code, MESSAGE: ret_message} if ret_access_msg is not None: data[ACCESS_MSG] = ret_access_msg if data_row_list is not None: data[RET_DATA] = data_row_list response = make_response(json.dumps(data, ensure_ascii=False)) response.headers[CONTENT_TYPE] = CONTENT_TYPE_JSON response.headers[CSM_BS_CODE] = csm_bs_code return response @staticmethod def gen_inner_resp_data(ret_code, ret_val, data_row_list=None): """ 构造返回内部数据格式 :param ret_code: :param ret_val: :param data_row_list: :return: """ data = {ConstGen.RET_CODE: ret_code, ConstGen.RET_VAL: ret_val} if data_row_list is not None: data[ConstGen.DATA_ROWS_STR] = data_row_list return data @staticmethod def datetime_as_timezone(date_time, time_zone): """ 将时间转换为某一时区时间 :param date_time: :param time_zone: :return: """ tz = pytz.timezone(time_zone) utc = pytz.timezone('UTC') return date_time.replace(tzinfo=utc).astimezone(tz) @staticmethod def kube_time_format(date_time_str, format_pattern): """ 格式化kubenetes时间格式 :param date_time_str: :param format_pattern: :return: """ return datetime.datetime.strptime(date_time_str, format_pattern) @staticmethod def datetime_format_tostr(date_time, format_pattern): """ 将时间格式格式化为字符串 :param date_time: :param format_pattern: :return: """ format_pattern = "{0:" + format_pattern + "}" return format_pattern.format(date_time) @staticmethod def create_time_format(create_time_tz_str): """ 命名空间创建时间格式化 :param create_time_tz_str: :return: """ ns_create_time_tz = CommonFunc().kube_time_format( create_time_tz_str, ConstGen.pre_fmt) ns_create_time = CommonFunc().datetime_as_timezone(ns_create_time_tz, TZ) return CommonFunc().datetime_format_tostr(ns_create_time, ConstGen.post_fmt) @staticmethod def request_param2dict(request): """ 把请求参数封装到一个字典中 :param request: :return: """ arg_params = {} for k in request.values: arg_params[k] = request.values.get(k) return arg_params def request_trans_service(self, url, params): """ 封装请求传输 :param url: :param params: :return: """ trans = httptrans.HttpTrans(None) try: print("请求地址:" + url) ret = trans.post_encodedata(url, params, None) print("返回结果:" + ret) except BaseException as err: print('[' + url + '] 请求异常:' + err.__str__()) return self.gen_inner_resp_data("0", err.__str__()) else: return json.loads(ret) @staticmethod def exec_rest_api(method, api_url, params, timeout: int = None): """ 执行REST API操作 :param method: :param api_url: :param params: :param timeout: :return: """ if timeout is None: timeout = 240 if DATA_STR in params: data = params[DATA_STR] else: data = None if PARAMS_STR in params: params_data = params[PARAMS_STR] else: params_data = None if HEADERS in params: headers = params[HEADERS] urllib3.disable_warnings() try: print(api_url) resp = requests.request(method, api_url, headers=headers, data=data, params=params_data, verify=False, timeout=timeout) resp.encoding = 'utf-8' except Exception as e: print("request请求异常" + e.__str__()) return False, "访问URL异常" elif CA_FLAG in params: ca_flag = params[CA_FLAG] if ca_flag is True: resp = requests.request(method, api_url, cert=(K8SOptJsonStr.CLIENT_CRT_FILE_PATH, K8SOptJsonStr.CLIENT_KEY_FILE_PATH), data=data, params=params_data, verify=K8SOptJsonStr.CA_CRT_FILE_PATH) else: return False, None else: return False, None return True, resp def ca_handle(self, ca_crt_data, client_crt_data, client_key_data): """ CA处理 :param ca_crt_data: :param client_crt_data: :param client_key_data: :return: """ if ca_crt_data is not None and client_crt_data is not None and client_key_data is not None: os.system('mkdir $(dirname ' + K8SOptJsonStr.CA_CRT_FILE_PATH + ')') os.system( 'mkdir $(dirname ' + K8SOptJsonStr.CLIENT_CRT_FILE_PATH + ')') os.system( 'mkdir $(dirname ' + K8SOptJsonStr.CLIENT_KEY_FILE_PATH + ')') flag1 = self.generate_ca_crt_file( K8SOptJsonStr.CA_CRT_FILE_PATH, ca_crt_data) flag2 = self.generate_client_crt_file( K8SOptJsonStr.CLIENT_CRT_FILE_PATH, client_crt_data) flag3 = self.generate_client_key_file( K8SOptJsonStr.CLIENT_KEY_FILE_PATH, client_key_data) if flag1 and flag2 and flag3: return True else: return False else: return False @staticmethod def header_handle(headers, token): """ header处理 :param headers: :param token: :return: """ if token is not None: last_token = K8SRestStr.BEARER_STR + token headers[K8SRestStr.AUTHORIZATION_STR] = last_token return headers @staticmethod def generate_ca_crt_file(file_path, ca_crt_data): """ 生成CA证书文件 :param file_path: :param ca_crt_data: :return: """ try: kube_ca_crt_file = open(file_path, 'w') kube_ca_crt_file.write(K8SOptJsonStr.BEGIN_CERTIFICATE + "\n") kube_ca_crt_file.write(ca_crt_data + "\n") kube_ca_crt_file.write(K8SOptJsonStr.END_CERTIFICATE) kube_ca_crt_file.close() return True except Exception as e: print(e) return False @staticmethod def generate_client_crt_file(file_path, client_crt_data): """ 生成客户端证书文件 :param file_path: :param client_crt_data: :return: """ try: kube_client_crt_file = open(file_path, 'w') kube_client_crt_file.write(K8SOptJsonStr.BEGIN_CERTIFICATE + "\n") kube_client_crt_file.write(client_crt_data + "\n") kube_client_crt_file.write(K8SOptJsonStr.END_CERTIFICATE) kube_client_crt_file.close() return True except Exception as e: print(e) return False @staticmethod def generate_client_key_file(file_path, client_key_data): """ 生成客户端密钥文件 :param file_path: :param client_key_data: :return: """ try: kube_client_key_file = open(file_path, 'w') kube_client_key_file.write( K8SOptJsonStr.BEGIN_RSA_PRIVATE_KEY + "\n") kube_client_key_file.write(client_key_data + "\n") kube_client_key_file.write(K8SOptJsonStr.END_RSA_PRIVATE_KEY) kube_client_key_file.close() except Exception as e: print(e) return False def pre_exec_rest_api(self, method, request_url, params=None, token=None, ca_crt_data=None, client_crt_data=None, client_key_data=None): """ 执行前预处理操作 :param method: :param request_url: :param params: :param token: :param ca_crt_data: :param client_crt_data: :param client_key_data: :return: """ if params is None: params = {} if token is not None: if HEADERS in params: headers = params[HEADERS] else: headers = {} if "disable" != token: post_headers = self.header_handle(headers, token=token) else: post_headers = headers if post_headers is not None: params[HEADERS] = post_headers else: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, "headers数据不存在") elif ca_crt_data is not None and client_crt_data is not None and client_key_data is not None: flag = self.ca_handle( ca_crt_data, client_crt_data, client_key_data) if flag: params[CA_FLAG] = flag else: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, "生成CA证书数据失败") else: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, "缺少TOKEN或CA证书数据") resp_tupe = self.exec_rest_api(method, request_url, params) if resp_tupe[0] is True: if resp_tupe[1].status_code >= 200 and resp_tupe[1].status_code < 300: if resp_tupe[1].status_code != 204 and ConstGen.CONTENT_TYPE_JSON in resp_tupe[1].headers.get( ConstGen.CONTENT_TYPE): resp_data = json.loads(resp_tupe[1].text) if K8SRestStr.STATUS in resp_data and resp_data[K8SRestStr.STATUS] == ResponseCode.Failure.value: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, resp_data[K8SRestStr.MESSAGE]) else: return self.gen_inner_resp_data(ConstGen.SUCCESS_CODE, ConstGen.SUCCESS_STR, resp_data) else: return self.gen_inner_resp_data(ConstGen.SUCCESS_CODE, ConstGen.SUCCESS_STR, resp_tupe[1].text) elif resp_tupe[1].status_code == 404: print(resp_tupe[1].text + "请求资源未找到") return self.gen_inner_resp_data(ConstGen.FAIL_CODE, "请求资源未找到") elif resp_tupe[1].status_code == 409: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, "该资源已存在") else: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, resp_tupe[1].text) elif resp_tupe[0] is False and resp_tupe[1] is not None: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, resp_tupe[1]) else: return self.gen_inner_resp_data(ConstGen.FAIL_CODE, "Kubernetes操作失败") def get_json_from_jsonpath(self, jsonpath_list): """ 根据jsonpath生成json数据格式的封装服务 :param seletCondition: :param table_name: :return: """ trans_data = {} trans_data[ConstGen.TRANS_JSON_ARRAY_STR] = jsonpath_list print(trans_data) ret_data = self.request_trans_service( ConstGen.JSON_PATH_TO_JSON_URL, trans_data) print(ret_data) if ret_data[ConstGen.RET_CODE] != '0': if ConstGen.RET_VAL in ret_data: data = ret_data[ConstGen.RET_VAL] return data else: return None else: return None def pop_json_key(self, json_data, keys: list): if isinstance(json_data, dict): for k in list(json_data.keys()): if k in keys: json_data.pop(k) elif isinstance(json_data[k], dict): self.pop_json_key(json_data[k], keys) return True else: return False def alter_json_value(self, json_data, json_key, value): if isinstance(json_data, dict): for k in list(json_data.keys()): if k == json_key: json_data[json_key] = value elif isinstance(json_data[k], dict): self.alter_json_value(json_data[k], json_key, value) return True else: return False