#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @File: test_ludq_k8s_utils.py @Time: 2021/05/04 22:01 @Description: 测试代码 """ import os import unittest from typing import List from unittest import TestCase, main from cucc_common_pkg.ludq_utils.utils_k8s.k8s_api_objects import K8sApiObject from cucc_common_pkg.ludq_utils.utils_k8s.k8s_utils import K8sUtils, K8sConst from cucc_common_pkg.ludq_utils.utils_k8s_knative.knative_service_obj import KnativeServiceApiObject from cucc_common_pkg.ludq_utils.utils_k8s_knative.knative_utils import KnativeUtils from cucc_common_pkg.ludq_utils.utils_v2 import UtilityV2 class TestUtility(TestCase): r''' TestUtility ''' @unittest.skipIf(False, '需正确设置accessToken,vpc_name_full,account_id,region_code后测试') def test_format_result(self): r''' Returns: ''' k8s_api_object = K8sApiObject() k8s_api_object.api_version = "v1" k8s_api_object.kind = "Pod" k8s_api_object.namespace = "default" k8s_api_object.name = "testpod" self.assertEqual("v1", k8s_api_object.api_version) self.assertEqual("v1", k8s_api_object.obj_type.api_version) self.assertEqual("Pod", k8s_api_object.kind) self.assertEqual("Pod", k8s_api_object.obj_type.kind) self.assertEqual("default", k8s_api_object.namespace) self.assertEqual("default", k8s_api_object.metadata.namespace) self.assertEqual("testpod", k8s_api_object.name) self.assertEqual("testpod", k8s_api_object.metadata.name) expect_dict_for_obj_type = { "apiVersion": "v1", "kind": "Pod", } self.assertEqual(expect_dict_for_obj_type, k8s_api_object.obj_type.to_dict()) expect_dict_for_metadata = { "namespace": "default", "name": "testpod", } self.assertEqual(expect_dict_for_metadata, k8s_api_object.metadata.to_dict()) expect_dict_for_obj = dict(**expect_dict_for_obj_type) expect_dict_for_obj["metadata"] = expect_dict_for_metadata self.assertEqual(expect_dict_for_obj, k8s_api_object.to_dict()) @unittest.skipIf(False, '需正确设置accessToken,vpc_name_full,account_id,region_code后测试') def test_k8s_utils(self): k8s_utils = K8sUtils( api_server=os.getenv("api_server"), token=os.getenv("token"), ) app_response = k8s_utils.list_k8s_objects( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) if app_response.data.get("items"): app_response = k8s_utils.delete_k8s_object( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.create_k8s_object( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", object_yaml_content=""" { "apiVersion": "serving.knative.dev/v1", "kind": "Service", "metadata": { "name": "helloworld", "namespace": "default" }, "spec": { "template": { "metadata": { "annotations": { "autoscaling.knative.dev/metric": "rps" }, "creationTimestamp": null }, "spec": { "containerConcurrency": 0, "containers": [ { "env": [ { "name": "TARGET", "value": "Sample python v1" } ], "image": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-nodejs-framework", "imagePullPolicy": "Always", "name": "user-container", "ports": [ { "containerPort": 8080 } ], "readinessProbe": { "successThreshold": 1, "tcpSocket": { "port": 0 } }, "resources": {} } ], "enableServiceLinks": false, "timeoutSeconds": 300 } }, "traffic": [ { "latestRevision": true, "percent": 100 } ] } } """ ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.describe_k8s_object( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.patch_k8s_object_by_json_patch( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", patch_content=[ { "op": "replace", "path": "/spec/template/spec/containers/0/image", "value": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-go-framework", } ] ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.patch_k8s_object_by_merge_patch( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", object_yaml_content=""" { "apiVersion": "serving.knative.dev/v1", "kind": "Service", "metadata": { "name": "helloworld", "namespace": "default" }, "spec": { "template": { "metadata": { "annotations": { "autoscaling.knative.dev/metric": "rps" }, "creationTimestamp": null }, "spec": { "containerConcurrency": 0, "containers": [ { "env": [ { "name": "TARGET", "value": "Sample python v1" } ], "image": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-java-framework", "imagePullPolicy": "Always", "name": "user-container", "ports": [ { "containerPort": 8080 } ], "readinessProbe": { "successThreshold": 1, "tcpSocket": { "port": 0 } }, "resources": {} } ], "enableServiceLinks": false, "timeoutSeconds": 300 } }, "traffic": [ { "latestRevision": true, "percent": 100 } ] } } """ ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) def test_k8s_utils(self): k8s_utils = K8sUtils( api_server=os.getenv("api_server"), token=os.getenv("token"), ) app_response = k8s_utils.list_k8s_objects( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) if app_response.data.get("items"): app_response = k8s_utils.delete_k8s_object( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.create_k8s_object( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", object_yaml_content=""" { "apiVersion": "serving.knative.dev/v1", "kind": "Service", "metadata": { "name": "helloworld", "namespace": "default" }, "spec": { "template": { "metadata": { "annotations": { "autoscaling.knative.dev/metric": "rps" }, "creationTimestamp": null }, "spec": { "containerConcurrency": 0, "containers": [ { "env": [ { "name": "TARGET", "value": "Sample python v1" } ], "image": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-nodejs-framework", "imagePullPolicy": "Always", "name": "user-container", "ports": [ { "containerPort": 8080 } ], "readinessProbe": { "successThreshold": 1, "tcpSocket": { "port": 0 } }, "resources": {} } ], "enableServiceLinks": false, "timeoutSeconds": 300 } }, "traffic": [ { "latestRevision": true, "percent": 100 } ] } } """ ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.describe_k8s_object( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.patch_k8s_object_by_json_patch( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", patch_content=[ { "op": "replace", "path": "/spec/template/spec/containers/0/image", "value": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-go-framework", } ] ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) app_response = k8s_utils.patch_k8s_object_by_merge_patch( obj_type=K8sConst.OBJ_TYPE_KNATIVE_SVC, namespace="default", obj_name="helloworld", object_yaml_content=""" { "apiVersion": "serving.knative.dev/v1", "kind": "Service", "metadata": { "name": "helloworld", "namespace": "default" }, "spec": { "template": { "metadata": { "annotations": { "autoscaling.knative.dev/metric": "rps" }, "creationTimestamp": null }, "spec": { "containerConcurrency": 0, "containers": [ { "env": [ { "name": "TARGET", "value": "Sample python v1" } ], "image": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-java-framework", "imagePullPolicy": "Always", "name": "user-container", "ports": [ { "containerPort": 8080 } ], "readinessProbe": { "successThreshold": 1, "tcpSocket": { "port": 0 } }, "resources": {} } ], "enableServiceLinks": false, "timeoutSeconds": 300 } }, "traffic": [ { "latestRevision": true, "percent": 100 } ] } } """ ) self.assertTrue(app_response.is_ok()) print(UtilityV2.dict2jsonstr(app_response.data)) @unittest.skipIf(True, '需正确设置accessToken,vpc_name_full,account_id,region_code后测试') def test_knative_utils(self): knative_utils = KnativeUtils( api_server=os.getenv("api_server"), token=os.getenv("token"), ) app_response = knative_utils.list_knative_svcs( namespace="default", ) self.assertTrue(app_response.is_ok()) svc_list: List[KnativeServiceApiObject] = app_response.data for tmp_knative_svc in svc_list: print(tmp_knative_svc.to_k8s_api_obj().to_dict()) if app_response.data: app_response = knative_utils.delete_knative_svc( namespace="default", obj_name="helloworld", ) self.assertTrue(app_response.is_ok()) knative_service_api_obj: KnativeServiceApiObject = KnativeServiceApiObject.from_dict( UtilityV2.jsonstr2dict(""" { "apiVersion": "serving.knative.dev/v1", "kind": "Service", "metadata": { "name": "helloworld", "namespace": "default" }, "spec": { "template": { "metadata": { "annotations": { "autoscaling.knative.dev/metric": "rps" }, "creationTimestamp": null }, "spec": { "containerConcurrency": 0, "containers": [ { "env": [ { "name": "TARGET", "value": "Sample python v1" } ], "image": "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-nodejs-framework", "imagePullPolicy": "Always", "name": "user-container", "ports": [ { "containerPort": 8080 } ], "readinessProbe": { "successThreshold": 1, "tcpSocket": { "port": 0 } }, "resources": {} } ], "enableServiceLinks": false, "timeoutSeconds": 300 } }, "traffic": [ { "latestRevision": true, "percent": 100 } ] } } """) ) knative_service_api_obj.fill_attr_by_k8s_api_obj() app_response = knative_utils.create_knative_svc( knative_service_api_obj=knative_service_api_obj ) self.assertTrue(app_response.is_ok()) knative_service_api_obj: KnativeServiceApiObject = app_response.data print(UtilityV2.dict2jsonstr(knative_service_api_obj.to_k8s_api_obj().to_dict())) app_response = knative_utils.describe_knative_svc( namespace="default", obj_name="helloworld", ) self.assertTrue(app_response.is_ok()) knative_service_api_obj: KnativeServiceApiObject = app_response.data print(UtilityV2.dict2jsonstr(knative_service_api_obj.to_k8s_api_obj().to_dict())) knative_service_api_obj.template.revision_spec.pod_spec_dict["containers"][0][ "image"] = "harbor.dcos.guangzhou.unicom.local/istio/knative-demo/helloworld-python:4.0-java-framework" app_response = knative_utils.patch_knative_service_api_obj_by_merge_patch( knative_service_api_obj=knative_service_api_obj ) self.assertTrue(app_response.is_ok()) knative_service_api_obj: KnativeServiceApiObject = app_response.data print(UtilityV2.dict2jsonstr(knative_service_api_obj.to_k8s_api_obj().to_dict())) if __name__ == '__main__': main()