utils_v2.py 2.92 KB
Newer Older
qunfeng qiu's avatar
qunfeng qiu committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""

from typing import List

from .utils_base import UtilityBaseV2
from .utils_mesh_mng.cluster_istio import ClusterIstio, EnumClusterSource
from .utils_mesh_mng.csm_instance import CsmInstance
from .utils_mesh_mng.csm_instance_cluster import CsmInstanceCluster

try:
    import thread
except ImportError:
    import _thread as thread


class UtilityV2(UtilityBaseV2):
    r"""
    便捷功能类,不依赖任何类的应用的便捷功能类
    """

    def gen_all_cluster_istios(self, csm_instance: CsmInstance) -> List[ClusterIstio]:
        r"""
        生成实例的所有cluster_istios列表

        :param csm_instance:
        """
        result_list = list()
        result_list.append(csm_instance.cluster_istio)

        if csm_instance.other_clusters:
            result_list.extend(
                [tmp_instance_cluster.cluster_istio for tmp_instance_cluster in csm_instance.other_clusters]
            )
        return result_list

    def is_instance_contain_platform_cluster(
            self,
            csm_instance: CsmInstance,
            only_judge_main_cluster: bool = False,
            only_judge_instance_clusters: bool = False,
    ) -> bool:
        r"""
        判断csm实例是否包含集群提供的集群

        :param csm_instance:
        :param only_judge_instance_clusters:
        :param only_judge_main_cluster:
        """
        if only_judge_main_cluster:
            if csm_instance.cluster_istio.cluster_source == EnumClusterSource.PLATFORM.value:
                return True
            return False
        elif only_judge_instance_clusters:
            all_cluster_istios = list()
            if csm_instance.other_clusters:
                all_cluster_istios.extend(
                    [tmp_instance_cluster.cluster_istio for tmp_instance_cluster in csm_instance.other_clusters]
                )

            for tmp_cluster_istio in all_cluster_istios:
                if tmp_cluster_istio.cluster_source == EnumClusterSource.PLATFORM.value:
                    return True

            return False
        else:
            all_cluster_istios = self.gen_all_cluster_istios(csm_instance)

            for tmp_cluster_istio in all_cluster_istios:
                if tmp_cluster_istio.cluster_source == EnumClusterSource.PLATFORM.value:
                    return True

            return False

    def is_instance_cluster_contain_platform_cluster(self, instance_cluster: CsmInstanceCluster) -> bool:
        r"""
        判断instance_cluster是否包含集群提供的集群

        :param instance_cluster:
        """
        if instance_cluster.cluster_istio.cluster_source == EnumClusterSource.PLATFORM.value:
            return True

        return False