test_k8s_op.py 10.3 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: wenhx8
@File: test_httpTrans
@Time: 2019/1/8 008 9:35
@Description:
"""

from unittest import TestCase

from cucc_common_pkg.globalconst import GlobalConst
from cucc_common_pkg.cucc_k8s import K8sOps
import json
from jsonpath_rw_ext import parser


class TestK8sOps(TestCase):
    r'''
    TestHttpTrans
    '''

    def test_crd_ns_create_obj(self):
        '''
        测试创建crd对象
        :return:
        '''

        k8s = K8sOps(None)

        str_k8s_ip_port = "10.124.151.110:6443"
        str_k8s_token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjc20tbnMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiY3NtLXNhLXRva2VuLWc3cHZwIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImNzbS1zYSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjZjY2RkOWJiLTA1ZTktMTFlYS1hZmM4LTUyNTQwMDE3ZDk2MiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpjc20tbnM6Y3NtLXNhIn0.ohnipl80g-ITUOK1T7dMMeaDYDiYiCrTZ9qJVeXCnFSZFLLic7GeUWn2zAaYUe57PHh0-VuT99cyx6uVb-GFDx9bkoLvgVK_nZBEHNVin6EyeP2jnvtQ6oZGBfDfRYLrSfkTauyqnnQ3hHSZbuAt55VrZFR2ragUx30gmmnaynjQb8QMduqmvzbzKUpHgbjWzb6V93j-sR-ulLVzkbW913ceU5od6daxWiVt-98P9QQoyZMG3JwxqEOCK_G1ZzNrKrUm-DgZzx9P2HdZKfcTDyiVNfyhaZ0eOmWqwh1Xtw-NUatrlHmQYqj0FoBP0Zl1cVajlHqHwyVaVfHLephC4g'
        str_crd_group_version = "/networking.istio.io/v1alpha3"
        str_crd_ns_name = "/istio-dd"
        str_crd_obj = "/gateways"
        dict_args = '{"apiVersion":"networking.istio.io/v1alpha3","kind":"Gateway","metadata":{"name":"wen-gateway","namespace":"istio-system"},"spec":{"selector":{"istio":"ingressgateway"},"servers":[{"port":{"number":80,"name":"http","protocol":"HTTP"},"hosts":["*"]}]}}'
        dict_args = json.loads(dict_args)

        ret_str = k8s.crd_ns_create_obj(
            str_k8s_ip_port=str_k8s_ip_port,
            str_k8s_token=str_k8s_token,
            str_crd_ns_name=str_crd_ns_name,
            str_crd_group_version=str_crd_group_version,
            str_crd_obj=str_crd_obj,
            dict_args=dict_args
        )

        print(ret_str)

    def test_crd_ns_patch_obj(self):
        '''
        测试修改crd对象
        :return:
        '''

        k8s = K8sOps(None)

        str_k8s_ip_port = "10.124.151.110:6443"
        str_k8s_token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjc20tbnMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiY3NtLXNhLXRva2VuLWc3cHZwIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImNzbS1zYSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjZjY2RkOWJiLTA1ZTktMTFlYS1hZmM4LTUyNTQwMDE3ZDk2MiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpjc20tbnM6Y3NtLXNhIn0.ohnipl80g-ITUOK1T7dMMeaDYDiYiCrTZ9qJVeXCnFSZFLLic7GeUWn2zAaYUe57PHh0-VuT99cyx6uVb-GFDx9bkoLvgVK_nZBEHNVin6EyeP2jnvtQ6oZGBfDfRYLrSfkTauyqnnQ3hHSZbuAt55VrZFR2ragUx30gmmnaynjQb8QMduqmvzbzKUpHgbjWzb6V93j-sR-ulLVzkbW913ceU5od6daxWiVt-98P9QQoyZMG3JwxqEOCK_G1ZzNrKrUm-DgZzx9P2HdZKfcTDyiVNfyhaZ0eOmWqwh1Xtw-NUatrlHmQYqj0FoBP0Zl1cVajlHqHwyVaVfHLephC4g'
        str_crd_group_version = "/networking.istio.io/v1alpha3"
        str_crd_ns_name = "/istio-system"
        str_crd_obj = "/gateways"
        str_crd_obj_name = '/wen-gateway'
        #dict_args = '{"apiVersion":"networking.istio.io/v1alpha3","kind":"Gateway","metadata":{"name":"wen-gateway","namespace":"istio-system"},"spec":{"selector":{"istio":"ingressgateway"},"servers":[{"port":{"number":8880,"name":"http","protocol":"HTTP"},"hosts":["*"]}]}}'

        dict_args ={}

        json_mata = json.loads('{ "name": "wen-gateway","namespace": "istio-system"}')
        dict_args["apiVersion"]="networking.istio.io/v1alpha3"
        dict_args["kind"] = "Gateway"
        dict_args["metadata"] = json_mata

        json_arr = '{"servers":[{"port":{"number":8880,"name":"http","protocol":"HTTP"},"hosts":["*"]}]}'

        dict_arry = json.loads(json_arr)
        dict_arry["selector"] = json.loads('{"istio": "ingressgateway"}')
        dict_args["spec"] = dict_arry
        # dict_args = json.loads(dict_args)

        print(json.dumps(dict_args))



        ret_str = k8s.crd_ns_patch_obj(
            str_k8s_ip_port=str_k8s_ip_port,
            str_k8s_token=str_k8s_token,
            str_crd_ns_name=str_crd_ns_name,
            str_crd_group_version=str_crd_group_version,
            str_crd_obj=str_crd_obj,
            str_crd_obj_name=str_crd_obj_name,
            dict_args=dict_args
        )

        print(ret_str)

    def test_crd_ns_get_obj(self):
        '''
        测试修改crd对象
        :return:
        '''

        k8s = K8sOps(None)

        str_k8s_ip_port = "10.124.151.110:6443"
        str_k8s_token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjc20tbnMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiY3NtLXNhLXRva2VuLWc3cHZwIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImNzbS1zYSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjZjY2RkOWJiLTA1ZTktMTFlYS1hZmM4LTUyNTQwMDE3ZDk2MiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpjc20tbnM6Y3NtLXNhIn0.ohnipl80g-ITUOK1T7dMMeaDYDiYiCrTZ9qJVeXCnFSZFLLic7GeUWn2zAaYUe57PHh0-VuT99cyx6uVb-GFDx9bkoLvgVK_nZBEHNVin6EyeP2jnvtQ6oZGBfDfRYLrSfkTauyqnnQ3hHSZbuAt55VrZFR2ragUx30gmmnaynjQb8QMduqmvzbzKUpHgbjWzb6V93j-sR-ulLVzkbW913ceU5od6daxWiVt-98P9QQoyZMG3JwxqEOCK_G1ZzNrKrUm-DgZzx9P2HdZKfcTDyiVNfyhaZ0eOmWqwh1Xtw-NUatrlHmQYqj0FoBP0Zl1cVajlHqHwyVaVfHLephC4g'
        str_crd_group_version = "/networking.istio.io/v1alpha3"
        str_crd_ns_name = "/istio-system"
        str_crd_obj = "/gateways"
        str_crd_obj_name = '/wen-gateway'
        dict_args = '{"apiVersion":"networking.istio.io/v1alpha3","kind":"Gateway","metadata":{"name":"wen-gateway","namespace":"istio-system"},"spec":{"selector":{"istio":"ingressgateway"},"servers":[{"port":{"number":8880,"name":"http","protocol":"HTTP"},"hosts":["*"]}]}}'
        dict_args = json.loads(dict_args)

        ret_str = k8s.crd_ns_get_obj(
            str_k8s_ip_port=str_k8s_ip_port,
            str_k8s_token=str_k8s_token,
            str_crd_ns_name=str_crd_ns_name,
            str_crd_group_version=str_crd_group_version,
            str_crd_obj=str_crd_obj,
            str_crd_obj_name=str_crd_obj_name,
            dict_args=dict_args
        )

        print(ret_str)

    def test_crd_ns_list_obj(self):
        '''
        测试创建crd对象
        :return:
        '''

        k8s = K8sOps(None)

        str_k8s_ip_port = "10.124.151.110:6443"
        str_k8s_token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjc20tbnMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiY3NtLXNhLXRva2VuLWc3cHZwIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImNzbS1zYSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjZjY2RkOWJiLTA1ZTktMTFlYS1hZmM4LTUyNTQwMDE3ZDk2MiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpjc20tbnM6Y3NtLXNhIn0.ohnipl80g-ITUOK1T7dMMeaDYDiYiCrTZ9qJVeXCnFSZFLLic7GeUWn2zAaYUe57PHh0-VuT99cyx6uVb-GFDx9bkoLvgVK_nZBEHNVin6EyeP2jnvtQ6oZGBfDfRYLrSfkTauyqnnQ3hHSZbuAt55VrZFR2ragUx30gmmnaynjQb8QMduqmvzbzKUpHgbjWzb6V93j-sR-ulLVzkbW913ceU5od6daxWiVt-98P9QQoyZMG3JwxqEOCK_G1ZzNrKrUm-DgZzx9P2HdZKfcTDyiVNfyhaZ0eOmWqwh1Xtw-NUatrlHmQYqj0FoBP0Zl1cVajlHqHwyVaVfHLephC4g'
        # str_crd_group_version = "/networking.istio.io/v1alpha3"
        # str_crd_ns_name = "/istio-system"
        # str_crd_obj = "/virtualservices"
        str_crd_group_version = "/v1"
        str_crd_ns_name = "/istio-system"
        str_prefix_url = "/api"
        str_crd_obj = "/pods"

        dict_args = {"labelSelector":"app=details"}

        ret_str = k8s.crd_ns_list_obj(
            str_k8s_ip_port=str_k8s_ip_port,
            str_k8s_token=str_k8s_token,
            str_crd_ns_name=str_crd_ns_name,
            str_crd_group_version=str_crd_group_version,
            str_crd_obj=str_crd_obj,
            str_prefix_url=str_prefix_url,
            dict_args=dict_args
        )

        print(json.dumps(ret_str))
        match_result = parser.match('$.items[?(@.metadata.labels.app=="gitbook-csm-design-doc")]', ret_str)

        print(json.dumps(match_result))

    def test_crd_ns_delete_obj(self):
        '''
        测试创建crd对象
        :return:
        '''

        k8s = K8sOps(None)

        str_k8s_ip_port = "10.124.151.110:6443"
        str_k8s_token = 'eyJhbGciOiJSUzI1NiIsImtpZCI6IiJ9.eyJpc3MiOiJrdWJlcm5ldGVzL3NlcnZpY2VhY2NvdW50Iiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9uYW1lc3BhY2UiOiJjc20tbnMiLCJrdWJlcm5ldGVzLmlvL3NlcnZpY2VhY2NvdW50L3NlY3JldC5uYW1lIjoiY3NtLXNhLXRva2VuLWc3cHZwIiwia3ViZXJuZXRlcy5pby9zZXJ2aWNlYWNjb3VudC9zZXJ2aWNlLWFjY291bnQubmFtZSI6ImNzbS1zYSIsImt1YmVybmV0ZXMuaW8vc2VydmljZWFjY291bnQvc2VydmljZS1hY2NvdW50LnVpZCI6IjZjY2RkOWJiLTA1ZTktMTFlYS1hZmM4LTUyNTQwMDE3ZDk2MiIsInN1YiI6InN5c3RlbTpzZXJ2aWNlYWNjb3VudDpjc20tbnM6Y3NtLXNhIn0.ohnipl80g-ITUOK1T7dMMeaDYDiYiCrTZ9qJVeXCnFSZFLLic7GeUWn2zAaYUe57PHh0-VuT99cyx6uVb-GFDx9bkoLvgVK_nZBEHNVin6EyeP2jnvtQ6oZGBfDfRYLrSfkTauyqnnQ3hHSZbuAt55VrZFR2ragUx30gmmnaynjQb8QMduqmvzbzKUpHgbjWzb6V93j-sR-ulLVzkbW913ceU5od6daxWiVt-98P9QQoyZMG3JwxqEOCK_G1ZzNrKrUm-DgZzx9P2HdZKfcTDyiVNfyhaZ0eOmWqwh1Xtw-NUatrlHmQYqj0FoBP0Zl1cVajlHqHwyVaVfHLephC4g'
        str_crd_group_version = "/networking.istio.io/v1alpha3"

        str_crd_ns_name = "/istio-system"
        str_crd_obj = "/gateways"
        str_crd_obj_name = '/bookinfo-gateway'
        dict_args = '{"apiVersion":"networking.istio.io/v1alpha3","kind":"Gateway","metadata":{"name":"wen-gateway","namespace":"zhangx511"},"spec":{"selector":{"istio":"ingressgateway"},"servers":[{"port":{"number":8880,"name":"http","protocol":"HTTP"},"hosts":["*"]}]}}'
        dict_args = json.loads(dict_args)

        ret_str = k8s.crd_ns_delete_obj(
            str_k8s_ip_port=str_k8s_ip_port,
            str_k8s_token=str_k8s_token,
            str_crd_ns_name=str_crd_ns_name,
            str_crd_group_version=str_crd_group_version,
            str_crd_obj=str_crd_obj,
            str_crd_obj_name=str_crd_obj_name,
            dict_args=dict_args
        )

        print(ret_str)