#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import urllib.parse import requests from .globalconst import GlobalConst, HttpConst, SkyGlobalConst from .my_utils import MyUtils from .sky_baseexception import MyBaseException, create_base_exception from .sky_httptrans import HttpTrans class K8sOps(object): # 标准API STANDARD_API = "/api" # 标准API版本 STANDARD_API_VERSION = "/v1" # 自定义API CRD_API = "/apis" # k8s的ns固定路径 K8S_NS = "/namespaces" def __init__(self, params): ''' Constructor ''' def crd_cs_list_obj(self, str_k8s_ip_port: str = None, str_k8s_token: str = None, str_prefix_url: str = CRD_API, str_crd_group_version: str = None, str_crd_obj: str = None, dict_args: dict = None) -> dict: ''' 获取自定义集群的对象列表 Args: str_k8s_ip_port: k8s 端口 str_k8s_token: k8s token str_prefix_url: k8s api固定前缀路径 ,/apis str_crd_group_version: k8s crd 组和版本 /networking.istio.io/v1alpha3 str_crd_obj: k8s crd 对象名称 /virtualservices dict_args: 查询传递参数 {"dd":"dd"} Returns: 返回对象字典 ''' url_args = '' if dict_args is not None: url_args = '?' for tmp_k, tmp_v in dict_args.items(): if url_args == '?': url_args = url_args + tmp_k + '=' + urllib.parse.quote(tmp_v) else: url_args = "&" + url_args + tmp_k + '=' + urllib.parse.quote(tmp_v) str_url = GlobalConst.DEFAULT_URLSCHEMA_SECRUE + str_k8s_ip_port + str_prefix_url + str_crd_group_version + str_crd_obj + url_args http_trans = HttpTrans(None) arg_extra_dict_header = {HttpConst.HEADER_KEY_CONTENT_TYPE: HttpConst.CONTENT_TYPE_JSON , HttpConst.AUTH_KEY: HttpConst.AUTH_BEARER_PREFIX + str_k8s_token } try: m_response = http_trans.get_data( urlstr=str_url, trans_data=dict_args, dict_header=arg_extra_dict_header ) ret_dict = {} if not m_response.ok: raise create_base_exception( '访问url出错,访问地址:{0},返回码{1}'.format(str_url + ":" + m_response.text, m_response.status_code) ) else: ret_dict[SkyGlobalConst.RETKEY_RET_CODE] = SkyGlobalConst.RETCODE_SUCESS_CODE ret_dict[SkyGlobalConst.RETKEY_RET_VAL] = SkyGlobalConst.RETCODE_SUCESS_CODE str_ret = m_response.text dict_ret = MyUtils.jsonstr2json(m_response.text) if dict_ret is None: raise create_base_exception( '返回的内容不是有效的json字符串:{}'.format(str_ret) ) ret_dict[SkyGlobalConst.RETKEY_DATAROWS] = dict_ret except requests.exceptions.RequestException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except MyBaseException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except Exception as exception: return MyBaseException.format_exception_to_standard_dict(exception) return ret_dict def crd_ns_list_obj(self, str_k8s_ip_port: str = None, str_k8s_token: str = None, str_prefix_url: str = CRD_API, str_crd_group_version: str = None, str_crd_ns_url: str = K8S_NS, str_crd_ns_name: str = None, str_crd_obj: str = None, dict_args: dict = None) -> dict: ''' 获取自定义集群的对象列表 Args: str_k8s_ip_port: k8s 端口 str_k8s_token: k8s token str_prefix_url: k8s api固定前缀路径 ,/apis str_crd_ns_url: k8s 命名空间固定路径 ,/namespaces str_crd_ns_name: k8s crd 集群命名空间 ,/csm str_crd_group_version: k8s crd 组和版本 /networking.istio.io/v1alpha3 str_crd_obj: k8s crd 对象名称 /virtualservices dict_args: 查询传递参数 {"dd":"dd"} Returns: 返回对象字典 ''' url_args = '' if dict_args is not None: url_args = '?' for tmp_k, tmp_v in dict_args.items(): if url_args == '?': url_args = url_args + tmp_k + '=' + urllib.parse.quote(tmp_v) else: url_args = "&" + url_args + tmp_k + '=' + urllib.parse.quote(tmp_v) str_url = GlobalConst.DEFAULT_URLSCHEMA_SECRUE + str_k8s_ip_port + str_prefix_url + str_crd_group_version + str_crd_ns_url + str_crd_ns_name + str_crd_obj + url_args http_trans = HttpTrans(None) arg_extra_dict_header = {HttpConst.HEADER_KEY_CONTENT_TYPE: HttpConst.CONTENT_TYPE_JSON , HttpConst.AUTH_KEY: HttpConst.AUTH_BEARER_PREFIX + str_k8s_token } try: m_response = http_trans.get_data( urlstr=str_url, trans_data=dict_args, dict_header=arg_extra_dict_header ) ret_dict = {} if not m_response.ok: raise create_base_exception( '访问url出错,访问地址:{0},返回码{1}'.format(str_url + ":" + m_response.text, m_response.status_code) ) else: ret_dict[SkyGlobalConst.RETKEY_RET_CODE] = SkyGlobalConst.RETCODE_SUCESS_CODE ret_dict[SkyGlobalConst.RETKEY_RET_VAL] = SkyGlobalConst.RETCODE_SUCESS_CODE str_ret = m_response.text dict_ret = MyUtils.jsonstr2json(m_response.text) if dict_ret is None: raise create_base_exception( '返回的内容不是有效的json字符串:{}'.format(str_ret) ) ret_dict[SkyGlobalConst.RETKEY_DATAROWS] = dict_ret except requests.exceptions.RequestException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except MyBaseException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except Exception as exception: return MyBaseException.format_exception_to_standard_dict(exception) return ret_dict def crd_ns_create_obj(self, str_k8s_ip_port: str = None, str_k8s_token: str = None, str_prefix_url: str = CRD_API, str_crd_group_version: str = None, str_crd_ns_url: str = K8S_NS, str_crd_ns_name: str = None, str_crd_obj: str = None, dict_args: dict = None) -> dict: ''' 创建自定义集群的对象列表 Args: str_k8s_ip_port: k8s 端口 str_k8s_token: k8s token str_prefix_url: k8s api固定前缀路径 ,/apis str_crd_ns_url: k8s 命名空间固定路径 ,/namespaces str_crd_ns_name: k8s crd 集群命名空间 ,/csm str_crd_group_version: k8s crd 组和版本 /networking.istio.io/v1alpha3 str_crd_obj: k8s crd 对象名称 /virtualservices dict_args: 查询传递参数 {"dd":"dd"} Returns: 返回对象字典 ''' str_url = GlobalConst.DEFAULT_URLSCHEMA_SECRUE + str_k8s_ip_port + str_prefix_url + str_crd_group_version + str_crd_ns_url + str_crd_ns_name + str_crd_obj http_trans = HttpTrans(None) arg_extra_dict_header = {HttpConst.HEADER_KEY_CONTENT_TYPE: HttpConst.CONTENT_TYPE_JSON , HttpConst.AUTH_KEY: HttpConst.AUTH_BEARER_PREFIX + str_k8s_token } try: m_response = http_trans.post_encodeddata_standard( urlstr=str_url, trans_data=dict_args, dict_header=arg_extra_dict_header ) ret_dict = {} if not m_response.ok: raise create_base_exception( '访问url出错,访问地址:{0},返回码{1}'.format(str_url + ":" + m_response.text, m_response.status_code) ) else: ret_dict[SkyGlobalConst.RETKEY_RET_CODE] = SkyGlobalConst.RETCODE_SUCESS_CODE ret_dict[SkyGlobalConst.RETKEY_RET_VAL] = SkyGlobalConst.RETCODE_SUCESS_CODE str_ret = m_response.text dict_ret = MyUtils.jsonstr2json(m_response.text) if dict_ret is None: raise create_base_exception( '返回的内容不是有效的json字符串:{}'.format(str_ret) ) ret_dict[SkyGlobalConst.RETKEY_DATAROWS] = dict_ret except requests.exceptions.RequestException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except MyBaseException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except Exception as exception: return MyBaseException.format_exception_to_standard_dict(exception) return ret_dict def crd_ns_patch_obj(self, str_k8s_ip_port: str = None, str_k8s_token: str = None, str_prefix_url: str = CRD_API, str_crd_group_version: str = None, str_crd_ns_url: str = K8S_NS, str_crd_ns_name: str = None, str_crd_obj: str = None, str_crd_obj_name: str = None, dict_args: dict = None) -> dict: ''' 更新自定义集群的对象列表 Args: str_k8s_ip_port: k8s 端口 str_k8s_token: k8s token str_prefix_url: k8s api固定前缀路径 ,/apis str_crd_ns_url: k8s 命名空间固定路径 ,/namespaces str_crd_ns_name: k8s crd 集群命名空间 ,/csm str_crd_group_version: k8s crd 组和版本 /networking.istio.io/v1alpha3 str_crd_obj: k8s crd 对象名称 /virtualservices str_crd_obj_name: k8s crd 对象实例名称 /wen-gateway dict_args: 查询传递参数 {"dd":"dd"} Returns: 返回对象字典 ''' str_url = GlobalConst.DEFAULT_URLSCHEMA_SECRUE + str_k8s_ip_port + str_prefix_url + str_crd_group_version + str_crd_ns_url + str_crd_ns_name + str_crd_obj + str_crd_obj_name http_trans = HttpTrans(None) arg_extra_dict_header = {HttpConst.HEADER_KEY_CONTENT_TYPE: HttpConst.CONTENT_TYPE_PATH_JSON , HttpConst.AUTH_KEY: HttpConst.AUTH_BEARER_PREFIX + str_k8s_token } try: m_response = http_trans.patch_encodeddata_standard( urlstr=str_url, trans_data=dict_args, dict_header=arg_extra_dict_header ) ret_dict = {} if not m_response.ok: raise create_base_exception( '访问url出错,访问地址:{0},返回码{1}'.format(str_url + ":" + m_response.text, m_response.status_code) ) else: ret_dict[SkyGlobalConst.RETKEY_RET_CODE] = SkyGlobalConst.RETCODE_SUCESS_CODE ret_dict[SkyGlobalConst.RETKEY_RET_VAL] = SkyGlobalConst.RETCODE_SUCESS_CODE str_ret = m_response.text dict_ret = MyUtils.jsonstr2json(m_response.text) if dict_ret is None: raise create_base_exception( '返回的内容不是有效的json字符串:{}'.format(str_ret) ) ret_dict[SkyGlobalConst.RETKEY_DATAROWS] = dict_ret except requests.exceptions.RequestException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except MyBaseException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except Exception as exception: return MyBaseException.format_exception_to_standard_dict(exception) return ret_dict def crd_ns_get_obj(self, str_k8s_ip_port: str = None, str_k8s_token: str = None, str_prefix_url: str = CRD_API, str_crd_group_version: str = None, str_crd_ns_url: str = K8S_NS, str_crd_ns_name: str = None, str_crd_obj: str = None, str_crd_obj_name: str = None, dict_args: dict = None) -> dict: ''' 获取自定义集群的对象信息 Args: str_k8s_ip_port: k8s 端口 str_k8s_token: k8s token str_prefix_url: k8s api固定前缀路径 ,/apis str_crd_ns_url: k8s 命名空间固定路径 ,/namespaces str_crd_ns_name: k8s crd 集群命名空间 ,/csm str_crd_group_version: k8s crd 组和版本 /networking.istio.io/v1alpha3 str_crd_obj: k8s crd 对象名称 /virtualservices str_crd_obj_name: k8s crd 对象实例名称 /wen-gateway dict_args: 查询传递参数 {"dd":"dd"} Returns: 返回对象字典 ''' str_url = GlobalConst.DEFAULT_URLSCHEMA_SECRUE + str_k8s_ip_port + str_prefix_url + str_crd_group_version + str_crd_ns_url + str_crd_ns_name + str_crd_obj + str_crd_obj_name http_trans = HttpTrans(None) arg_extra_dict_header = {HttpConst.HEADER_KEY_CONTENT_TYPE: HttpConst.CONTENT_TYPE_PATH_JSON , HttpConst.AUTH_KEY: HttpConst.AUTH_BEARER_PREFIX + str_k8s_token } try: m_response = http_trans.get_data( urlstr=str_url, trans_data=dict_args, dict_header=arg_extra_dict_header ) ret_dict = {} if not m_response.ok: raise create_base_exception( '访问url出错,访问地址:{0},返回码{1}'.format(str_url + ":" + m_response.text, m_response.status_code) ) else: ret_dict[SkyGlobalConst.RETKEY_RET_CODE] = SkyGlobalConst.RETCODE_SUCESS_CODE ret_dict[SkyGlobalConst.RETKEY_RET_VAL] = SkyGlobalConst.RETCODE_SUCESS_CODE str_ret = m_response.text dict_ret = MyUtils.jsonstr2dict(m_response.text) if dict_ret is None: raise create_base_exception( '返回的内容不是有效的json字符串:{}'.format(str_ret) ) ret_dict[SkyGlobalConst.RETKEY_DATAROWS] = dict_ret except requests.exceptions.RequestException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except MyBaseException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except Exception as exception: return MyBaseException.format_exception_to_standard_dict(exception) return ret_dict def crd_ns_delete_obj(self, str_k8s_ip_port: str = None, str_k8s_token: str = None, str_prefix_url: str = CRD_API, str_crd_group_version: str = None, str_crd_ns_url: str = K8S_NS, str_crd_ns_name: str = None, str_crd_obj: str = None, str_crd_obj_name: str = None, dict_args: dict = None) -> dict: ''' 删除自定义集群的对象信息 Args: str_k8s_ip_port: k8s 端口 str_k8s_token: k8s token str_prefix_url: k8s api固定前缀路径 ,/apis str_crd_ns_url: k8s 命名空间固定路径 ,/namespaces str_crd_ns_name: k8s crd 集群命名空间 ,/csm str_crd_group_version: k8s crd 组和版本 /networking.istio.io/v1alpha3 str_crd_obj: k8s crd 对象名称 /virtualservices str_crd_obj_name: k8s crd 对象实例名称 /wen-gateway dict_args: 查询传递参数 {"dd":"dd"} Returns: 返回对象字典 ''' str_url = GlobalConst.DEFAULT_URLSCHEMA_SECRUE + str_k8s_ip_port + str_prefix_url + str_crd_group_version + str_crd_ns_url + str_crd_ns_name + str_crd_obj + str_crd_obj_name http_trans = HttpTrans(None) arg_extra_dict_header = {HttpConst.HEADER_KEY_CONTENT_TYPE: HttpConst.CONTENT_TYPE_PATH_JSON , HttpConst.AUTH_KEY: HttpConst.AUTH_BEARER_PREFIX + str_k8s_token } try: m_response = http_trans.delete_encodeddata_standard( urlstr=str_url, trans_data=dict_args, dict_header=arg_extra_dict_header ) ret_dict = {} if not m_response.ok: raise create_base_exception( '访问url出错,访问地址:{0},返回码{1}'.format(str_url + ":" + m_response.text, m_response.status_code) ) else: ret_dict[SkyGlobalConst.RETKEY_RET_CODE] = SkyGlobalConst.RETCODE_SUCESS_CODE ret_dict[SkyGlobalConst.RETKEY_RET_VAL] = SkyGlobalConst.RETCODE_SUCESS_CODE except requests.exceptions.RequestException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except MyBaseException as exception: return MyBaseException.format_exception_to_standard_dict(exception) except Exception as exception: return MyBaseException.format_exception_to_standard_dict(exception) return ret_dict