Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Open sidebar
qiuqunfeng
common_pkg
Commits
11b2601e
Commit
11b2601e
authored
Mar 13, 2025
by
qunfeng qiu
Browse files
Initial commit
parents
Changes
148
Show whitespace changes
Inline
Side-by-side
Showing
20 changed files
with
2957 additions
and
0 deletions
+2957
-0
cucc_common_pkg/ludq_utils/utils_promapi/prom_utils.py
cucc_common_pkg/ludq_utils/utils_promapi/prom_utils.py
+303
-0
cucc_common_pkg/ludq_utils/utils_tg_billing/__init__.py
cucc_common_pkg/ludq_utils/utils_tg_billing/__init__.py
+10
-0
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_create_order_info.py
.../ludq_utils/utils_tg_billing/billing_create_order_info.py
+84
-0
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_feededuct_info.py
...pkg/ludq_utils/utils_tg_billing/billing_feededuct_info.py
+53
-0
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_update_order_info.py
.../ludq_utils/utils_tg_billing/billing_update_order_info.py
+41
-0
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_utils.py
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_utils.py
+273
-0
cucc_common_pkg/ludq_utils/utils_tg_configcenter/__init__.py
cucc_common_pkg/ludq_utils/utils_tg_configcenter/__init__.py
+10
-0
cucc_common_pkg/ludq_utils/utils_tg_configcenter/configcenter_region_quota.py
..._utils/utils_tg_configcenter/configcenter_region_quota.py
+123
-0
cucc_common_pkg/ludq_utils/utils_tg_configcenter/configcenter_response.py
...ludq_utils/utils_tg_configcenter/configcenter_response.py
+45
-0
cucc_common_pkg/ludq_utils/utils_tg_configcenter/configcenter_utils.py
...kg/ludq_utils/utils_tg_configcenter/configcenter_utils.py
+389
-0
cucc_common_pkg/ludq_utils/utils_tg_iam/__init__.py
cucc_common_pkg/ludq_utils/utils_tg_iam/__init__.py
+10
-0
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_instance.py
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_instance.py
+40
-0
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_response.py
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_response.py
+26
-0
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_utils.py
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_utils.py
+435
-0
cucc_common_pkg/ludq_utils/utils_tg_sso/__init__.py
cucc_common_pkg/ludq_utils/utils_tg_sso/__init__.py
+10
-0
cucc_common_pkg/ludq_utils/utils_tg_sso/sso_utils.py
cucc_common_pkg/ludq_utils/utils_tg_sso/sso_utils.py
+266
-0
cucc_common_pkg/ludq_utils/utils_tg_vpc/__init__.py
cucc_common_pkg/ludq_utils/utils_tg_vpc/__init__.py
+10
-0
cucc_common_pkg/ludq_utils/utils_tg_vpc/vpc_utils.py
cucc_common_pkg/ludq_utils/utils_tg_vpc/vpc_utils.py
+727
-0
cucc_common_pkg/ludq_utils/utils_v2.py
cucc_common_pkg/ludq_utils/utils_v2.py
+92
-0
cucc_common_pkg/ludq_utils/utils_webservice/__init__.py
cucc_common_pkg/ludq_utils/utils_webservice/__init__.py
+10
-0
No files found.
cucc_common_pkg/ludq_utils/utils_promapi/prom_utils.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
datetime
import
datetime
from
typing
import
Optional
,
List
,
Union
import
requests
from
requests.auth
import
HTTPBasicAuth
from
.prom_config
import
PromServerConfig
from
.prom_result_vector
import
PromQueryResult
,
PromResultValue
from
..base_const
import
ConstResponseCode
from
..common_app_config
import
CommonAppConfig
from
..utils_base
import
UtilityBaseV2
from
..utils_http.http_utils
import
HttpUtils
from
..utils_k8s.k8s_api_object_utils
import
OperationResultException
class
PromApiUtils
:
r
"""
Prometheus API Utils类
"""
prom_config
:
Optional
[
PromServerConfig
]
=
None
app_utils
=
None
http_utils
=
None
def
__init__
(
self
,
prom_config
:
PromServerConfig
,
**
kwargs
):
r
"""
初始化
:param prom_config:
:param kwargs:
:key my_app_utils:
:key http_utils:
:key incoming_request_headers: 透传的 incoming_request.headers
:key logger: 用于打印日志
"""
self
.
prom_config
=
prom_config
self
.
app_utils
=
kwargs
.
get
(
"app_utils"
)
or
UtilityBaseV2
(
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
http_utils
=
kwargs
.
get
(
"http_utils"
)
or
HttpUtils
(
incoming_request_headers
=
kwargs
.
get
(
"incoming_request_headers"
),
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
def
query
(
self
,
query
:
str
,
param_time
:
Union
[
int
,
float
,
str
,
datetime
]
=
None
,
param_timeout
:
str
=
None
,
callback_fn
=
None
,
)
->
List
[
PromQueryResult
]:
r
"""
查询vector指标
:param query: 查询表达式
:param param_time: 可以是unix时间戳(秒数),也可以是本地日期的字符串,或者是datetime类型的日期
:param param_timeout: 示例 1m 1h 1d
:param callback_fn: 获取查询结果时,对每条数据的处理, 格式为 fn(result:PromQueryResult)
"""
param_time
=
self
.
app_utils
.
format_time_to_unix_timestamp_seconds
(
param_time
)
urlstr
=
f
"
{
self
.
prom_config
.
api_server
}
/api/v1/query"
content_type
=
"application/x-www-form-urlencoded"
post_data
=
dict
()
post_data
[
"query"
]
=
query
if
param_time
is
not
None
:
post_data
[
"time"
]
=
param_time
if
param_timeout
:
post_data
[
"timeout"
]
=
param_timeout
headers
=
{
"Content-Type"
:
content_type
}
if
self
.
prom_config
.
basic_auth
:
do_http_info
=
f
"访问prom api,post url:
{
urlstr
}
,data=
{
post_data
}
,headers=
{
headers
}
"
response
=
self
.
http_utils
.
do_http_post_with_log_time
(
do_http_info
=
do_http_info
,
url
=
urlstr
,
data
=
post_data
,
headers
=
headers
,
verify
=
False
,
auth
=
HTTPBasicAuth
(
self
.
prom_config
.
basic_auth_user
,
self
.
prom_config
.
basic_auth_passwd
),
)
else
:
do_http_info
=
f
"访问prom api,post url:
{
urlstr
}
,data=
{
post_data
}
,headers=
{
headers
}
"
response
=
self
.
http_utils
.
do_http_post_with_log_time
(
do_http_info
=
do_http_info
,
url
=
urlstr
,
data
=
post_data
,
headers
=
headers
,
verify
=
False
,
)
response_data
:
dict
=
self
.
_get_data_in_response
(
response
=
response
,
post_data
=
post_data
)
return
self
.
_handle_query_response_data
(
response_data
=
response_data
,
callback_fn
=
callback_fn
)
def
query_range
(
self
,
query
:
str
,
start_time
:
Union
[
int
,
float
,
str
,
datetime
],
end_time
:
Union
[
int
,
float
,
str
,
datetime
],
param_step
:
Union
[
int
,
float
,
str
],
param_timeout
:
str
=
None
,
callback_fn
=
None
,
)
->
List
[
PromQueryResult
]:
r
"""
查询vector指标
:param query: 查询表达式
:param start_time: 可以是unix时间戳(秒数),也可以是本地日期的字符串,或者是datetime类型的日期
:param end_time: 可以是unix时间戳(秒数),也可以是本地日期的字符串,或者是datetime类型的日期
:param param_step: 可以是 1m 1h 1d 这样的字符串表现形式 , 或者是 int/float 表示的 秒数
:param param_timeout: 示例 1m 1h 1d
:param callback_fn: 获取查询结果时,对每条数据的处理, 格式为 fn(result:PromQueryResult)
"""
start_time
=
self
.
app_utils
.
format_time_to_unix_timestamp_seconds
(
start_time
)
end_time
=
self
.
app_utils
.
format_time_to_unix_timestamp_seconds
(
end_time
)
urlstr
=
f
"
{
self
.
prom_config
.
api_server
}
/api/v1/query_range"
content_type
=
"application/x-www-form-urlencoded"
post_data
=
dict
()
post_data
[
"query"
]
=
query
post_data
[
"start"
]
=
start_time
post_data
[
"end"
]
=
end_time
post_data
[
"step"
]
=
param_step
if
param_timeout
:
post_data
[
"timeout"
]
=
param_timeout
headers
=
{
"Content-Type"
:
content_type
}
if
self
.
prom_config
.
basic_auth
:
do_http_info
=
f
"访问prom api,post url:
{
urlstr
}
,data=
{
post_data
}
,headers=
{
headers
}
"
response
=
self
.
http_utils
.
do_http_post_with_log_time
(
do_http_info
=
do_http_info
,
url
=
urlstr
,
data
=
post_data
,
headers
=
headers
,
verify
=
False
,
auth
=
HTTPBasicAuth
(
self
.
prom_config
.
basic_auth_user
,
self
.
prom_config
.
basic_auth_passwd
),
)
else
:
do_http_info
=
f
"访问prom api,post url:
{
urlstr
}
,data=
{
post_data
}
,headers=
{
headers
}
"
response
=
self
.
http_utils
.
do_http_post_with_log_time
(
do_http_info
=
do_http_info
,
url
=
urlstr
,
data
=
post_data
,
headers
=
headers
,
verify
=
False
,
)
response_data
:
dict
=
self
.
_get_data_in_response
(
response
=
response
,
post_data
=
post_data
)
return
self
.
_handle_query_response_data
(
response_data
=
response_data
,
callback_fn
=
callback_fn
)
def
_get_data_in_response
(
self
,
response
:
requests
.
Response
,
post_data
):
r
"""
检查response状态码,以及response内容中的status字段,并返回response内容中的data字段
:param response:
:param post_data:
"""
status_code
=
response
.
status_code
if
status_code
==
401
:
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi:未授权(
{
status_code
}
)"
,
detail_info
=
f
"访问PromQueryApi:未授权(
{
status_code
}
),使用的user:dwp为
{
self
.
prom_config
.
basic_auth_user
}
:
{
self
.
prom_config
.
basic_auth_passwd
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
resp_status_code
=
status_code
,
resp_text
=
response
.
text
,
)
elif
status_code
==
400
:
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi:Bad Request(
{
status_code
}
)"
,
detail_info
=
f
"访问PromQueryApi:Bad Request(
{
status_code
}
),使用的post_data:
{
post_data
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
resp_status_code
=
status_code
,
resp_text
=
response
.
text
,
)
elif
status_code
==
422
:
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi:Unprocessable Entity(
{
status_code
}
)"
,
detail_info
=
f
"访问PromQueryApi:Unprocessable Entity(
{
status_code
}
),使用的post_data:
{
post_data
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
resp_status_code
=
status_code
,
resp_text
=
response
.
text
,
)
elif
status_code
==
503
:
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi:Service Unavailable(
{
status_code
}
)"
,
detail_info
=
f
"访问PromQueryApi:Service Unavailable(
{
status_code
}
),使用的post_data:
{
post_data
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
resp_status_code
=
status_code
,
resp_text
=
response
.
text
,
)
elif
status_code
!=
200
:
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi:非预期的状态码(
{
status_code
}
)"
,
detail_info
=
f
"访问PromQueryApi:非预期的状态码(
{
status_code
}
),使用的post_data:
{
post_data
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
resp_status_code
=
status_code
,
resp_text
=
response
.
text
,
)
response_text
=
response
.
text
response_json
=
self
.
app_utils
.
jsonstr2dict
(
response_text
,
b_raise_error_when_illegal
=
True
)
status_in_body
=
response_json
.
get
(
"status"
)
if
status_in_body
!=
"success"
:
error_type
=
response_json
.
get
(
"errorType"
)
error
=
response_json
.
get
(
"error"
)
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi错误:errorType=
{
error_type
}
, error=
{
error
}
"
,
detail_info
=
f
"访问PromQueryApi错误:返回的内容:
{
response_text
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
resp_status_code
=
status_code
,
resp_text
=
response
.
text
,
)
return
response_json
.
get
(
"data"
)
def
_handle_query_response_data
(
self
,
response_data
:
dict
,
callback_fn
=
None
,
)
->
List
[
PromQueryResult
]:
result_type
=
response_data
.
get
(
"resultType"
)
if
result_type
==
"matrix"
:
return
self
.
_handle_query_response_data_for_matrix
(
response_data
=
response_data
,
callback_fn
=
callback_fn
)
elif
result_type
==
"vector"
:
return
self
.
_handle_query_response_data_for_vector
(
response_data
=
response_data
,
callback_fn
=
callback_fn
)
elif
result_type
==
"scalar"
:
return
self
.
_handle_query_response_data_for_scalar
(
response_data
=
response_data
,
callback_fn
=
callback_fn
)
else
:
raise
OperationResultException
(
failed_reason
=
f
"访问PromQueryApi错误:不支持的resultType(
{
result_type
}
)"
,
detail_info
=
f
"访问PromQueryApi错误:不支持的resultType(
{
result_type
}
),data内容:
{
response_data
}
"
,
code
=
ConstResponseCode
.
CODE_SYS_ERROR
,
)
def
_handle_query_response_data_for_matrix
(
self
,
response_data
:
dict
,
callback_fn
=
None
,
)
->
List
[
PromQueryResult
]:
result_list
:
List
[
PromQueryResult
]
=
list
()
result_data
:
List
[
dict
]
=
response_data
.
get
(
"result"
)
for
tmp_dict
in
result_data
:
tmp_result
:
PromQueryResult
=
PromQueryResult
()
tmp_result
.
result_type
=
"matrix"
tmp_result
.
metric_labels
=
tmp_dict
.
get
(
"metric"
)
tmp_result
.
metric_value_list
=
list
()
for
tmp_value
in
tmp_dict
.
get
(
"values"
):
tmp_result
.
metric_value_list
.
append
(
self
.
_parse_single_result_to_prom_value
(
tmp_value
))
if
callback_fn
:
callback_fn
(
tmp_result
)
result_list
.
append
(
tmp_result
)
return
result_list
def
_handle_query_response_data_for_vector
(
self
,
response_data
:
dict
,
callback_fn
=
None
,
)
->
List
[
PromQueryResult
]:
result_list
:
List
[
PromQueryResult
]
=
list
()
result_data
:
List
[
dict
]
=
response_data
.
get
(
"result"
)
for
tmp_dict
in
result_data
:
tmp_result
:
PromQueryResult
=
PromQueryResult
()
tmp_result
.
result_type
=
"vector"
tmp_result
.
metric_labels
=
tmp_dict
.
get
(
"metric"
)
tmp_result
.
metric_value
=
self
.
_parse_single_result_to_prom_value
(
tmp_dict
.
get
(
"value"
))
if
callback_fn
:
callback_fn
(
tmp_result
)
result_list
.
append
(
tmp_result
)
return
result_list
def
_handle_query_response_data_for_scalar
(
self
,
response_data
:
dict
,
callback_fn
=
None
,
)
->
List
[
PromQueryResult
]:
result
:
PromQueryResult
=
PromQueryResult
()
result
.
result_type
=
"scalar"
result
.
metric_labels
=
None
result
.
metric_value
=
self
.
_parse_single_result_to_prom_value
(
response_data
.
get
(
"result"
))
if
callback_fn
:
callback_fn
(
result
)
return
[
result
]
def
_parse_single_result_to_prom_value
(
self
,
single_result
:
list
)
->
PromResultValue
:
return
PromResultValue
(
unix_timestamp
=
single_result
[
0
],
metric_value
=
single_result
[
1
],
)
cucc_common_pkg/ludq_utils/utils_tg_billing/__init__.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_create_order_info.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
...globalutility
import
Utility
class
InstanceInfoInCreateOrderInfo
:
r
"""
订单信息中的实例信息 POJO类
"""
instance_id
:
str
=
None
instance_name
:
str
=
None
instance_desc
:
str
=
None
engine_name
:
str
=
None
engine_version
:
str
=
None
instance_status
:
str
=
None
def
dict_for_api
(
self
)
->
dict
:
r
"""
转换为调用API时需要的字典(json)对象
:return:
"""
ret_dict
=
dict
()
ret_dict
[
"instanceID"
]
=
self
.
instance_id
ret_dict
[
"name"
]
=
self
.
instance_name
ret_dict
[
"instanceDesc"
]
=
self
.
instance_desc
ret_dict
[
"engineName"
]
=
self
.
engine_name
ret_dict
[
"engineVersion"
]
=
self
.
engine_version
ret_dict
[
"instanceStatus"
]
=
self
.
instance_status
return
ret_dict
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
class
CreateOrderInfo
:
r
"""
创建订单时的订单信息 POJO类
"""
instance
:
InstanceInfoInCreateOrderInfo
=
None
product_type_id
:
str
=
None
product_type_name
:
str
=
None
product_code
:
str
=
None
product_name
:
str
=
None
crn
:
str
=
None
region_code
:
str
=
None
open_product_time
:
str
=
None
creater_user_id
:
str
=
None
account_id
:
str
=
None
manage_url
:
str
=
None
def
dict_for_api
(
self
)
->
dict
:
r
"""
转换为调用API时需要的字典(json)对象
:return:
"""
ret_dict
=
dict
()
ret_dict
[
"instance"
]
=
self
.
instance
.
dict_for_api
()
if
self
.
instance
else
None
ret_dict
[
"productTypeID"
]
=
self
.
product_type_id
ret_dict
[
"productTypeName"
]
=
self
.
product_type_name
ret_dict
[
"productCode"
]
=
self
.
product_code
ret_dict
[
"productName"
]
=
self
.
product_name
ret_dict
[
"crn"
]
=
self
.
crn
ret_dict
[
"regionCode"
]
=
self
.
region_code
ret_dict
[
"openProductTime"
]
=
self
.
open_product_time
ret_dict
[
"createrUserID"
]
=
self
.
creater_user_id
ret_dict
[
"accountID"
]
=
self
.
account_id
ret_dict
[
"manageUrl"
]
=
self
.
manage_url
return
ret_dict
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_feededuct_info.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
...globalutility
import
Utility
class
FeedeductDetailInfo
:
r
"""
扣费信息 POJO类
"""
instance_id
:
str
=
None
account_id
:
str
=
None
amount
:
str
=
None
product_code
:
str
=
None
product_name
:
str
=
None
source_trade_id
:
str
=
None
start_time
:
str
=
None
end_time
:
str
=
None
pay_mode
:
int
=
None
failure_reason
:
str
=
None
r
"""
调用账单中心批量费用反馈API时返回的执行失败的费用明细时附加的失败原因
"""
def
dict_for_api
(
self
)
->
dict
:
r
"""
转换为调用API时需要的字典(json)对象
:return:
"""
ret_dict
=
dict
()
ret_dict
[
"instanceID"
]
=
self
.
instance_id
ret_dict
[
"accountID"
]
=
self
.
account_id
ret_dict
[
"amount"
]
=
self
.
amount
ret_dict
[
"productCode"
]
=
self
.
product_code
ret_dict
[
"productName"
]
=
self
.
product_name
ret_dict
[
"sourceTradeID"
]
=
self
.
source_trade_id
ret_dict
[
"startTime"
]
=
self
.
start_time
ret_dict
[
"endTime"
]
=
self
.
end_time
ret_dict
[
"payMode"
]
=
self
.
pay_mode
return
ret_dict
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_update_order_info.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
...globalutility
import
Utility
class
UpdateOrderInfo
:
r
"""
更新订单时的订单信息 POJO类
"""
instance_id
:
str
=
None
region_code
:
str
=
None
product_code
:
str
=
None
instance_state
:
str
=
None
message
:
str
=
None
def
dict_for_api
(
self
)
->
dict
:
r
"""
转换为调用API时需要的字典(json)对象
:return:
"""
ret_dict
=
dict
()
ret_dict
[
"instanceID"
]
=
self
.
instance_id
ret_dict
[
"region"
]
=
self
.
region_code
ret_dict
[
"productCode"
]
=
self
.
product_code
ret_dict
[
"state"
]
=
self
.
instance_state
ret_dict
[
"message"
]
=
self
.
message
return
ret_dict
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_billing/billing_utils.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
import
hashlib
from
typing
import
List
,
Dict
,
Union
from
.billing_create_order_info
import
CreateOrderInfo
from
.billing_feededuct_info
import
FeedeductDetailInfo
from
.billing_update_order_info
import
UpdateOrderInfo
from
..common_app_config
import
CommonAppConfig
from
..utils_base
import
UtilityBaseV2
from
..utils_http.http_utils
import
HttpUtils
from
...globalutility
import
Utility
from
...my_stringutils
import
MyStringUtils
class
BillingUtils
:
r
"""
天宫账单中心 API封装功能类,依赖HttpUtils类
针对3种不同的场景,API调用流程大致如下:
(1) 开通实例: 创建订单 -> (创建实例)-> 更新订单 -> 按需同步费用
(2) 扩缩容实例: 按需同步费用 -> 创建订单 -> (扩缩容实例)-> 更新订单 -> 按需同步费用
(3) 删除实例: 同步费用->(删除实例)->更新订单
(4) 每日定时同步所有实例的周期费用
"""
def
__init__
(
self
,
**
kwargs
):
r
"""
初始化
:key billing_api: 示例 https://billing.tg.unicom.local/billing
:key app_id
:key secret_key
:key http_utils
:key incoming_request
:key incoming_request_headers: 透传的 incoming_request.headers
:key app_utils
:key logger: 用于打印日志
"""
billing_api
=
kwargs
.
get
(
"billing_api"
)
app_id
=
kwargs
.
get
(
"app_id"
)
secret_key
=
kwargs
.
get
(
"secret_key"
)
http_utils
=
kwargs
.
get
(
"http_utils"
)
incoming_request
=
kwargs
.
get
(
"incoming_request"
)
app_utils
=
kwargs
.
get
(
"app_utils"
)
if
not
app_id
:
app_id
=
MyStringUtils
.
EMPTY
if
not
secret_key
:
secret_key
=
MyStringUtils
.
EMPTY
self
.
billing_api
:
str
=
billing_api
self
.
app_id
:
str
=
app_id
self
.
secret_key
:
str
=
secret_key
self
.
http_utils
:
HttpUtils
=
http_utils
self
.
incoming_request
=
incoming_request
if
not
http_utils
:
self
.
http_utils
=
HttpUtils
(
incoming_request
=
incoming_request
,
incoming_request_headers
=
kwargs
.
get
(
"incoming_request_headers"
),
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
if
not
app_utils
:
app_utils
=
UtilityBaseV2
(
logger
=
self
.
logger
)
self
.
app_utils
=
app_utils
def
create_order
(
self
,
order_info
:
CreateOrderInfo
)
->
str
:
r
"""
各产品中心通过自己控制台创建订单或扩缩容某个实例时,需要调用此接口将开通实例的相关信息传递给账单中心。
返回订单编号
:param order_info:
:return:
"""
# 声明基本参数
action
=
f
"调用billing_api:post
{
self
.
billing_api
}
/v1/order "
# 如果没有提供 accessToken 直接抛出异常
if
order_info
is
None
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供 order_info"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
billing_api
.
rstrip
(
'/'
)
if
self
.
billing_api
else
MyStringUtils
.
EMPTY
,
"/v1/order"
)
post_body
=
UtilityBaseV2
.
dict2jsonstr
(
order_info
.
dict_for_api
(),
sort_keys
=
True
,
remove_key_of_none_value
=
True
)
map_cookie
=
self
.
gen_cookie_for_sign
(
post_body
)
resp
=
self
.
http_utils
.
do_http_post
(
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
data
=
post_body
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"post urlStr="
,
apiurl_full
,
" data="
,
post_body
,
" map_cookie="
,
Utility
.
dict2jsonstr
(
map_cookie
))
data_dict_in_billing_response
=
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
)
return
data_dict_in_billing_response
.
get
(
"orderNo"
)
def
update_order
(
self
,
order_info
:
UpdateOrderInfo
):
r
"""
各产品中心需要调用此接口更新订单状态。
调用API失败会抛出异常
:param order_info:
:return:
"""
# 声明基本参数
action
=
f
"调用billing_api:put
{
self
.
billing_api
}
/v1/order "
# 如果没有提供 accessToken 直接抛出异常
if
order_info
is
None
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供 order_info"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
billing_api
.
rstrip
(
'/'
)
if
self
.
billing_api
else
MyStringUtils
.
EMPTY
,
"/v1/order"
)
post_body
=
UtilityBaseV2
.
dict2jsonstr
(
order_info
.
dict_for_api
(),
sort_keys
=
True
,
remove_key_of_none_value
=
True
)
map_cookie
=
self
.
gen_cookie_for_sign
(
post_body
)
resp
=
self
.
http_utils
.
do_http_trans
(
method
=
'PUT'
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
data
=
post_body
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"put urlStr="
,
apiurl_full
,
" data="
,
post_body
,
" map_cookie="
,
Utility
.
dict2jsonstr
(
map_cookie
))
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
,
b_check_data_in_app_response
=
False
)
def
feededuct_detail
(
self
,
feededuct_info
:
FeedeductDetailInfo
):
r
"""
各产品中心需要调用此接口将用户开通实例的扣费信息实时传递给Billing Center。。
调用API失败会抛出异常
:param feededuct_info:
:return:
"""
# 声明基本参数
action
=
f
"调用billing_api:post
{
self
.
billing_api
}
/v1/feedeductdetail "
# 如果没有提供 accessToken 直接抛出异常
if
feededuct_info
is
None
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供 feededuct_info"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
billing_api
.
rstrip
(
'/'
)
if
self
.
billing_api
else
MyStringUtils
.
EMPTY
,
"/v1/feedeductdetail"
)
post_body
=
UtilityBaseV2
.
dict2jsonstr
(
feededuct_info
.
dict_for_api
(),
sort_keys
=
True
,
remove_key_of_none_value
=
True
,
)
map_cookie
=
self
.
gen_cookie_for_sign
(
post_body
)
resp
=
self
.
http_utils
.
do_http_post
(
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
data
=
post_body
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"post urlStr="
,
apiurl_full
,
" data="
,
post_body
,
" map_cookie="
,
Utility
.
dict2jsonstr
(
map_cookie
))
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
,
b_check_data_in_app_response
=
False
)
def
batch_feededuct_detail
(
self
,
feededuct_info_list
:
List
[
Union
[
Dict
,
FeedeductDetailInfo
]]
)
->
List
[
Union
[
Dict
,
FeedeductDetailInfo
]]:
r
"""
各产品中心需要调用此接口将用户开通实例的批量扣费 信息实时传递给Billing Center。适用场景为:多实例费用同步。
调用API失败会抛出异常
API返回的data包含returnResultSet和operationalFailureResult两个key,分别表示原始的要同步的费用明细列表,以及失败的明细列表,
调用方应该将同步失败的明细通知管理员介入查看原因 或 重新进行同步,
该函数返回同步失败的费用明细list
:param feededuct_info_list:
:return:
"""
# 声明基本参数
action
=
f
"调用billing_api:post
{
self
.
billing_api
}
/v1/batchfeedeductdetail "
# 如果没有提供 accessToken 直接抛出异常
if
feededuct_info_list
is
None
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供 feededuct_info_list"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
billing_api
.
rstrip
(
'/'
)
if
self
.
billing_api
else
MyStringUtils
.
EMPTY
,
"/v1/batchfeedeductdetail"
)
post_body
=
UtilityBaseV2
.
list2jsonstr
(
feededuct_info_list
,
sort_keys
=
True
,
remove_key_of_none_value
=
True
,
attr_name_when_value_is_obj
=
'dict_for_api'
,
)
map_cookie
=
self
.
gen_cookie_for_sign
(
post_body
)
resp
=
self
.
http_utils
.
do_http_post
(
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
data
=
post_body
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"post urlStr="
,
apiurl_full
,
" data="
,
post_body
,
" map_cookie="
,
Utility
.
dict2jsonstr
(
map_cookie
))
data_dict_in_billing_response
:
dict
=
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
)
failed_list_in_dict
=
data_dict_in_billing_response
.
get
(
'operationalFailureResult'
)
def
is_feededuct_failed
(
a_feededuct_info
):
# 如果失败的列表为空,则没有失败的实例
if
not
failed_list_in_dict
:
return
False
if
isinstance
(
a_feededuct_info
,
FeedeductDetailInfo
):
source_trade_id
=
a_feededuct_info
.
source_trade_id
else
:
source_trade_id
=
a_feededuct_info
.
get
(
'sourceTradeID'
)
for
tmp_instance
in
failed_list_in_dict
:
if
source_trade_id
==
tmp_instance
.
get
(
'sourceTradeID'
):
# 返回true之前设置失败原因
if
isinstance
(
a_feededuct_info
,
FeedeductDetailInfo
):
a_feededuct_info
.
failure_reason
=
tmp_instance
.
get
(
'failureReason'
)
else
:
a_feededuct_info
[
'failureReason'
]
=
tmp_instance
.
get
(
'failureReason'
)
return
True
return
False
failed_list
=
list
(
filter
(
is_feededuct_failed
,
feededuct_info_list
))
return
failed_list
def
gen_cookie_for_sign
(
self
,
post_body
:
str
):
r
"""
生成cookie中的sign信息
:param post_body:
:return:
"""
md5_obj
=
hashlib
.
md5
()
md5_obj
.
update
(
Utility
.
join_str
(
post_body
,
self
.
secret_key
,
separator_str
=
'/'
).
encode
(
'utf-8'
))
sign
=
md5_obj
.
hexdigest
()
result
=
dict
()
result
[
'appID'
]
=
self
.
app_id
result
[
'sign'
]
=
sign
return
result
cucc_common_pkg/ludq_utils/utils_tg_configcenter/__init__.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
cucc_common_pkg/ludq_utils/utils_tg_configcenter/configcenter_region_quota.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
Optional
from
..app_exception
import
AppException
from
...globalutility
import
Utility
class
ConfigCenterRegion
:
r
"""
配置中心 API 返回的可访问区域 POJO类
"""
def
__init__
(
self
):
self
.
region_code
:
Optional
[
str
]
=
None
self
.
region_name
:
Optional
[
str
]
=
None
self
.
is_authorized
:
Optional
[
bool
]
=
None
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
class
CsmConfigCenterQuota
:
r
"""
配置中心 API 返回的CSM配额 POJO类,由应用自定义
"""
acct_quota_mem_mb
:
int
=
0
acct_quota_cpu_vcore
:
int
=
0
acct_quota_disk_mb
:
int
=
0
def
__init__
(
self
,
**
kwargs
):
r
"""
初始化
:param kwargs:
:key acct_quota_mem_mb:
:key mem:
:key acct_quota_cpu_vcore:
:key cpu:
:key acct_quota_disk_mb:
:key disk:
"""
acct_quota_mem_mb
=
kwargs
.
get
(
"acct_quota_mem_mb"
)
if
acct_quota_mem_mb
is
None
:
acct_quota_mem_mb
=
kwargs
.
get
(
"mem"
)
if
acct_quota_mem_mb
is
not
None
:
self
.
acct_quota_mem_mb
=
int
(
acct_quota_mem_mb
)
or
0
else
:
self
.
acct_quota_mem_mb
=
0
acct_quota_cpu_vcore
=
kwargs
.
get
(
"acct_quota_cpu_vcore"
)
if
acct_quota_cpu_vcore
is
None
:
acct_quota_cpu_vcore
=
kwargs
.
get
(
"cpu"
)
if
acct_quota_cpu_vcore
is
not
None
:
self
.
acct_quota_cpu_vcore
=
int
(
acct_quota_cpu_vcore
)
or
0
else
:
self
.
acct_quota_cpu_vcore
=
0
acct_quota_disk_mb
=
kwargs
.
get
(
"acct_quota_disk_mb"
)
if
acct_quota_disk_mb
is
None
:
acct_quota_disk_mb
=
kwargs
.
get
(
"disk"
)
if
acct_quota_disk_mb
is
not
None
:
self
.
acct_quota_disk_mb
=
int
(
acct_quota_disk_mb
)
or
0
else
:
self
.
acct_quota_disk_mb
=
0
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
def
__sub__
(
self
,
other
):
return
CsmConfigCenterQuota
(
acct_quota_cpu_vcore
=
self
.
acct_quota_cpu_vcore
-
other
.
acct_quota_cpu_vcore
,
acct_quota_mem_mb
=
self
.
acct_quota_mem_mb
-
other
.
acct_quota_mem_mb
,
acct_quota_disk_mb
=
self
.
acct_quota_disk_mb
-
other
.
acct_quota_disk_mb
,
)
def
is_all_quota_le_zero
(
self
)
->
bool
:
r
"""
判断 CsmConfigCenterQuota 的所有配额项是否为 小于等于0
"""
if
self
.
acct_quota_cpu_vcore
>
0
:
return
False
if
self
.
acct_quota_mem_mb
>
0
:
return
False
if
self
.
acct_quota_disk_mb
>
0
:
return
False
return
True
def
check_quota_ge_zero
(
self
,
app_code
:
str
=
"QuotaNotEnough"
):
r
"""
检查 CsmConfigCenterQuota , 当任意配额项小于0时,抛出异常
:param app_code:
"""
if
self
.
acct_quota_cpu_vcore
<
0
:
raise
AppException
(
code
=
app_code
or
"QuotaNotEnough"
,
message
=
"配额(cpu)不足"
)
elif
self
.
acct_quota_mem_mb
<
0
:
raise
AppException
(
code
=
app_code
or
"QuotaNotEnough"
,
message
=
"配额(mem)不足"
)
elif
self
.
acct_quota_disk_mb
<
0
:
raise
AppException
(
code
=
app_code
or
"QuotaNotEnough"
,
message
=
"配额(disk)不足"
)
class
ConfigCenterQuota
:
r
"""
配置中心 API 返回的配额 POJO类,由应用自定义
"""
def
__init__
(
self
):
self
.
app_max_nums
:
Optional
[
int
]
=
None
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_configcenter/configcenter_response.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
List
,
Optional
from
.configcenter_region_quota
import
ConfigCenterRegion
from
...globalutility
import
Utility
class
ConfigCenterResponse
:
r
"""
配置中心 API 返回的Data内容 POJO类,不依赖任何其他应用基础类
"""
is_config
:
bool
=
None
quota
:
dict
=
None
region_list
:
List
[
ConfigCenterRegion
]
=
None
product_code
:
str
=
None
def
__init__
(
self
):
self
.
is_config
:
Optional
[
bool
]
=
None
r
"""
是否有配置,配置中心返回 code=NotConfig 时 is_config=False
"""
self
.
quota
:
Optional
[
dict
]
=
None
r
"""
配置中心配额API返回的账户的配额
"""
self
.
region_list
:
Optional
[
List
[
ConfigCenterRegion
]]
=
None
r
"""
配置中心可访问区域API返回的账户的可访问区域列表
"""
self
.
product_code
:
Optional
[
str
]
=
None
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_configcenter/configcenter_utils.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
List
,
Optional
from
.configcenter_region_quota
import
ConfigCenterRegion
from
.configcenter_response
import
ConfigCenterResponse
from
..app_exception
import
AppRuntimeException
from
..app_response
import
AppResponse
from
..base_const
import
ConstResponseCode
from
..common_app_config
import
CommonAppConfig
from
..utils_base
import
UtilityBaseV2
from
..utils_http.http_utils
import
HttpUtils
from
...globalutility
import
Utility
from
...my_stringutils
import
MyStringUtils
class
ConfigCenterUtils
:
r
"""
天宫配置中心 API封装功能类,依赖HttpUtils类
"""
def
__init__
(
self
,
configcenter_api
:
str
,
access_token
:
str
,
http_utils
:
HttpUtils
=
None
,
incoming_request
=
None
,
**
kwargs
,
):
r
"""
初始化
:key configcenter_api: 示例 https://configcenter.tg.unicom.local/configcenter
:key http_utils
:key incoming_request
:key incoming_request_headers: 透传的 incoming_request.headers
:key access_token:
:key app_utils:
:key logger: 用于打印日志
"""
self
.
configcenter_api
:
str
=
configcenter_api
self
.
access_token
:
str
=
access_token
self
.
http_utils
:
HttpUtils
=
http_utils
self
.
incoming_request
=
incoming_request
if
not
http_utils
:
self
.
http_utils
=
HttpUtils
(
incoming_request
=
incoming_request
,
incoming_request_headers
=
kwargs
.
get
(
"incoming_request_headers"
),
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
app_utils
=
kwargs
.
get
(
"app_utils"
)
if
not
app_utils
:
app_utils
=
UtilityBaseV2
(
logger
=
self
.
logger
)
self
.
app_utils
=
app_utils
def
get_acct_quota
(
self
,
product_code
:
str
,
region_code
:
str
)
->
ConfigCenterResponse
:
r
"""
获取访问者所属账户的配额,
返回的 ConfigCenterResponse 中的有效字段是 is_config 和 quota
:param product_code:
:param region_code:
:return:
"""
# 声明基本参数
action
=
f
"调用configcenter_api:get
{
self
.
configcenter_api
}
/v1/product/
{
product_code
}
/region/
{
region_code
}
/quota "
# 如果没有提供 accessToken 直接抛出异常
if
not
self
.
access_token
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供access_token"
))
if
not
product_code
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供product_code"
))
if
not
region_code
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供region_code"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
configcenter_api
.
rstrip
(
'/'
)
if
self
.
configcenter_api
else
MyStringUtils
.
EMPTY
,
"/v1/product/"
,
product_code
,
"/region/"
,
region_code
,
"/quota"
,
)
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
self
.
access_token
do_http_info
=
f
"请求CONFIGCENTER API:GET
{
apiurl_full
}
, cookies=
{
map_cookie
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"get urlStr="
,
apiurl_full
,
" accessToken="
,
self
.
access_token
,
)
resp_dict
=
self
.
app_utils
.
check_resp_status_code_and_content
(
action
,
detail_info
,
resp
)
# 转换成标准的 AppResponse
app_resonse
=
AppResponse
.
from_dict
(
resp_dict
)
if
"NotConfig"
==
app_resonse
.
code
:
return
self
.
gen_configcenter_response_for_not_config
(
product_code
=
product_code
)
elif
ConstResponseCode
.
CODE_OK
!=
app_resonse
.
code
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败,返回的code="
,
app_resonse
.
code
,
" message="
,
app_resonse
.
message
,
),
Utility
.
join_str
(
action
,
"返回的code="
,
app_resonse
.
code
,
" message="
,
app_resonse
.
message
,
" "
,
detail_info
,
)
)
configcenter_response_dict
=
app_resonse
.
data
# 如果未获取到data,直接抛出异常
if
configcenter_response_dict
is
None
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败"
),
Utility
.
join_str
(
action
,
"返回的内容不包含data数据,content="
,
Utility
.
dict2jsonstr
(
resp_dict
),
" "
,
detail_info
,
)
)
# 转换为 ConfigCenterResponse
return
self
.
gen_configcenter_response_from_configcenter_quota_response
(
configcenter_response_dict
,
product_code
=
product_code
,
)
def
get_acct_region_list
(
self
,
product_code
:
str
,
region_code
:
str
=
None
,
is_authorized
:
Optional
[
bool
]
=
None
,
)
->
ConfigCenterResponse
:
r
"""
获取访问者所属账户的可访问区域列表
返回的 ConfigCenterResponse 中的有效字段是 is_config 和 region_list
:param product_code:
:param region_code: 仅返回指定的区域
:param is_authorized: 仅返回可访问区域或不可访问区域
:return:
"""
# 声明基本参数
action
=
f
"调用configcenter_api:get
{
self
.
configcenter_api
}
/v1/product/
{
product_code
}
/regions "
# 如果没有提供 accessToken 直接抛出异常
if
not
self
.
access_token
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供access_token"
))
if
not
product_code
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供product_code"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
configcenter_api
.
rstrip
(
'/'
)
if
self
.
configcenter_api
else
MyStringUtils
.
EMPTY
,
"/v1/product/"
,
product_code
,
"/regions"
,
)
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
self
.
access_token
query_args_dict
=
dict
()
if
region_code
:
query_args_dict
[
'regionCode'
]
=
region_code
if
is_authorized
is
not
None
:
query_args_dict
[
'isAuthorized'
]
=
is_authorized
do_http_info
=
f
"请求CONFIGCENTER API:GET
{
apiurl_full
}
, query=
{
query_args_dict
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
params
=
query_args_dict
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"get urlStr="
,
apiurl_full
,
" params="
,
Utility
.
dict2jsonstr
(
query_args_dict
),
" accessToken="
,
self
.
access_token
,
)
resp_dict
=
self
.
app_utils
.
check_resp_status_code_and_content
(
action
,
detail_info
,
resp
)
# 转换成标准的 AppResponse
app_resonse
=
AppResponse
.
from_dict
(
resp_dict
)
if
"NotConfig"
==
app_resonse
.
code
:
return
self
.
gen_configcenter_response_for_not_config
(
product_code
=
product_code
)
elif
ConstResponseCode
.
CODE_OK
!=
app_resonse
.
code
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败,返回的code="
,
app_resonse
.
code
,
" message="
,
app_resonse
.
message
),
Utility
.
join_str
(
action
,
"返回的code="
,
app_resonse
.
code
,
" message="
,
app_resonse
.
message
,
" "
,
detail_info
))
configcenter_response_list
=
app_resonse
.
data
# 如果未获取到可访问区域信息,直接抛出异常
if
configcenter_response_list
is
None
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败"
),
Utility
.
join_str
(
action
,
"返回的内容不包含data数据,content="
,
Utility
.
dict2jsonstr
(
resp_dict
),
" "
,
detail_info
))
# 转换为 ConfigCenterResponse
return
self
.
gen_configcenter_response_from_configcenter_region_response
(
configcenter_response_list
,
product_code
=
product_code
)
def
get_acct_region_list_for_multi_product
(
self
,
product_code_list
:
str
)
->
List
[
ConfigCenterResponse
]:
r
"""
获取访问者所属账户的可访问区域列表
返回的 ConfigCenterResponse 中的有效字段是 is_config 和 region_list
:param product_code_list: 使用逗号分隔的多个product_code
:return:
"""
# 声明基本参数
action
=
f
"调用configcenter_api:get
{
self
.
configcenter_api
}
/v1/product/productregions "
# 如果没有提供 accessToken 直接抛出异常
if
not
self
.
access_token
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供access_token"
))
if
not
product_code_list
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供product_code_list"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
configcenter_api
.
rstrip
(
'/'
)
if
self
.
configcenter_api
else
MyStringUtils
.
EMPTY
,
"/v1/product/productregions"
)
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
self
.
access_token
query_args_dict
=
dict
()
query_args_dict
[
'productCode'
]
=
product_code_list
do_http_info
=
f
"请求CONFIGCENTER API:GET
{
apiurl_full
}
, query=
{
query_args_dict
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
params
=
query_args_dict
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"get urlStr="
,
apiurl_full
,
" params="
,
Utility
.
dict2jsonstr
(
query_args_dict
),
" accessToken="
,
self
.
access_token
,
)
resp_dict
=
self
.
app_utils
.
check_resp_status_code_and_content
(
action
,
detail_info
,
resp
)
# 转换成标准的 AppResponse
app_resonse
=
AppResponse
.
from_dict
(
resp_dict
)
if
ConstResponseCode
.
CODE_OK
!=
app_resonse
.
code
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败,返回的code="
,
app_resonse
.
code
,
" message="
,
app_resonse
.
message
,
),
Utility
.
join_str
(
action
,
"返回的code="
,
app_resonse
.
code
,
" message="
,
app_resonse
.
message
,
" "
,
detail_info
,
)
)
app_response_list
=
app_resonse
.
data
# 如果未获取到可访问区域信息,直接抛出异常
if
app_response_list
is
None
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败"
),
Utility
.
join_str
(
action
,
"返回的内容不包含data数据,content="
,
Utility
.
dict2jsonstr
(
resp_dict
),
" "
,
detail_info
,
)
)
# 转换为 ConfigCenterResponse
result_list
=
list
()
for
tmp_app_response_dict
in
app_response_list
:
tmp_app_resonse
=
AppResponse
.
from_dict
(
tmp_app_response_dict
)
tmp_product_code
=
tmp_app_response_dict
.
get
(
"productCode"
)
if
"NotConfig"
==
tmp_app_resonse
.
code
:
result_list
.
append
(
self
.
gen_configcenter_response_for_not_config
(
product_code
=
tmp_product_code
))
else
:
result_list
.
append
(
self
.
gen_configcenter_response_from_configcenter_region_response
(
tmp_app_resonse
.
data
,
product_code
=
tmp_product_code
,
)
)
return
result_list
def
gen_configcenter_response_from_configcenter_quota_response
(
self
,
configcenter_response_dict
:
dict
,
product_code
=
None
,
)
->
ConfigCenterResponse
:
r
"""
根据 configcenter_api返回的配额信息dict转换成 ConfigCenterResponse 中的quota
:param product_code:
:param configcenter_response_dict:
:return:
"""
result
:
ConfigCenterResponse
=
ConfigCenterResponse
()
result
.
is_config
=
True
result
.
product_code
=
product_code
result
.
quota
=
configcenter_response_dict
return
result
def
gen_configcenter_response_from_configcenter_region_response
(
self
,
configcenter_response_list
:
list
,
product_code
=
None
,
)
->
ConfigCenterResponse
:
r
"""
根据 configcenter_api返回的区域信息dict转换成 ConfigCenterResponse 中的region
:param configcenter_response_list:
:param product_code
:return:
"""
result
:
ConfigCenterResponse
=
ConfigCenterResponse
()
result
.
is_config
=
True
result
.
product_code
=
product_code
if
not
configcenter_response_list
:
result
.
region_list
=
list
()
return
result
else
:
result
.
region_list
=
self
.
configcenter_region_list_from_list
(
configcenter_response_list
)
return
result
def
configcenter_region_from_dict
(
self
,
source_dict
:
dict
)
->
ConfigCenterRegion
:
r
"""
从IAM API返回的字典生成实例属性
:param source_dict:
:return:
"""
if
isinstance
(
source_dict
,
ConfigCenterRegion
):
return
source_dict
elif
isinstance
(
source_dict
,
str
):
source_dict
=
Utility
.
jsonstr2dict
(
source_dict
,
b_raise_error_when_illegal
=
True
)
result
:
ConfigCenterRegion
=
ConfigCenterRegion
()
result
.
region_code
=
source_dict
.
get
(
'regionCode'
)
result
.
region_name
=
source_dict
.
get
(
'regionName'
)
result
.
is_authorized
=
source_dict
.
get
(
'isAuthorized'
)
return
result
def
configcenter_region_list_from_list
(
self
,
source_list
:
list
)
->
Optional
[
List
[
ConfigCenterRegion
]]:
r
"""
从IAM API返回的字典生成实例属性
:param source_list:
:return:
"""
if
source_list
is
None
:
return
None
if
isinstance
(
source_list
,
str
):
source_list
=
Utility
.
jsonstr2list
(
source_list
)
return
[
self
.
configcenter_region_from_dict
(
tmp_source_dict
)
for
tmp_source_dict
in
source_list
]
def
gen_configcenter_response_for_not_config
(
self
,
product_code
=
None
)
->
ConfigCenterResponse
:
r
"""
生成属性 is_config=False 的 ConfigCenterResponse
:param product_code
:return:
"""
result
:
ConfigCenterResponse
=
ConfigCenterResponse
()
result
.
is_config
=
False
result
.
product_code
=
product_code
return
result
cucc_common_pkg/ludq_utils/utils_tg_iam/__init__.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_instance.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
List
from
...globalutility
import
Utility
class
IamInstanceAttribute
:
r
"""
IAM API需要的实例结构中的实例属性结构 POJO类,不依赖任何其他应用基础类
包含 attributeKey 和 attributeValue
"""
attribute_key
:
str
=
None
attribute_value
:
str
=
None
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
class
IamInstance
:
r
"""
IAM API 需要的实例结构 POJO类
包含crn和属性
"""
crn
:
str
=
None
instance_attribute
:
List
[
IamInstanceAttribute
]
=
None
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_response.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
...globalutility
import
Utility
class
IamResponse
:
r
"""
IAM API 返回的Data内容 POJO类,不依赖任何其他应用基础类
"""
user_has_permission
:
bool
=
None
policy_data
=
None
instance_list
:
list
=
None
no_permission_reason
:
str
=
None
def
__str__
(
self
):
return
Utility
.
dict2jsonstr
(
self
.
__dict__
)
cucc_common_pkg/ludq_utils/utils_tg_iam/iam_utils.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
Union
,
List
,
Dict
,
Optional
from
.iam_instance
import
IamInstance
,
IamInstanceAttribute
from
.iam_response
import
IamResponse
from
..app_exception
import
AppRuntimeException
from
..common_app_config
import
CommonAppConfig
from
..utils_base
import
UtilityBaseV2
from
..utils_http.http_utils
import
HttpUtils
from
...globalutility
import
Utility
from
...my_stringutils
import
MyStringUtils
class
IamUtils
:
r
"""
IAM API封装功能类,依赖HttpUtils类
"""
def
__init__
(
self
,
iam_api
:
str
,
access_token
:
str
,
http_utils
:
HttpUtils
=
None
,
incoming_request
=
None
,
**
kwargs
):
r
"""
初始化
:key iam_api: 示例 https://iam.tg.unicom.local/iam
:key http_utils
:key incoming_request
:key incoming_request_headers: 透传的 incoming_request.headers
:key access_token:
:key app_utils
:key logger: 用于打印日志
"""
self
.
iam_api
:
str
=
iam_api
self
.
access_token
:
str
=
access_token
self
.
http_utils
:
HttpUtils
=
http_utils
self
.
incoming_request
=
incoming_request
if
not
http_utils
:
self
.
http_utils
=
HttpUtils
(
incoming_request
=
incoming_request
,
incoming_request_headers
=
kwargs
.
get
(
"incoming_request_headers"
),
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
app_utils
=
kwargs
.
get
(
"app_utils"
)
if
not
app_utils
:
app_utils
=
UtilityBaseV2
(
logger
=
self
.
logger
)
self
.
app_utils
=
app_utils
def
check_user_has_permission
(
self
,
product_code
:
str
,
action_code
:
str
,
crn
:
str
=
None
)
->
IamResponse
:
r
"""
检查用户是否具有权限,
返回的IamResponse中的有效字段是 user_has_permission 和 policy_data
:param product_code:
:param action_code:
:param crn:
:return:
"""
# 声明基本参数
action
=
f
"调用iam_api:get
{
self
.
iam_api
}
/v2/checkuserhaspermission "
# 如果没有提供 accessToken 直接抛出异常
if
not
self
.
access_token
:
raise
AppRuntimeException
(
message
=
"进行IAM鉴权时未提供 access_token"
,
detail
=
Utility
.
join_str
(
action
,
"未提供access_token"
),
)
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
iam_api
.
rstrip
(
'/'
)
if
self
.
iam_api
else
MyStringUtils
.
EMPTY
,
"/v2/checkuserhaspermission"
,
)
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
self
.
access_token
query_args_dict
=
dict
()
query_args_dict
[
"productCode"
]
=
product_code
or
MyStringUtils
.
EMPTY
query_args_dict
[
"actionCode"
]
=
action_code
or
MyStringUtils
.
EMPTY
query_args_dict
[
"crn"
]
=
crn
or
MyStringUtils
.
EMPTY
do_http_info
=
f
"请求IAM API:GET
{
apiurl_full
}
query=
{
query_args_dict
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
params
=
query_args_dict
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"get urlStr="
,
apiurl_full
,
" params="
,
Utility
.
dict2jsonstr
(
query_args_dict
),
" accessToken="
,
self
.
access_token
,
)
iam_response_dict
=
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
)
# 转换为 IamResponse
return
self
.
gen_iam_response_from_iam_api_response
(
iam_response_dict
)
def
get_instancelist_by_attribution
(
self
,
policy_data
:
Union
[
str
,
dict
],
instance_list
:
Union
[
List
[
Dict
],
List
[
IamInstance
],
str
],
match_crn
:
bool
=
False
,
acct_id
:
str
=
None
,
)
->
IamResponse
:
r
"""
基于产品属性返回产品实例列表 , 在检查用户权限后得到的结果为 userHasPermission=true 且 policyData 不是null时调用.
返回的IamResponse中的有效字段是 instanceList
:param policy_data:
:param instance_list:
:param match_crn:
:param acct_id:
:return:
"""
# 声明基本参数
action
=
f
"调用iam_api:post
{
self
.
iam_api
}
/v2/getinstancelistbyattribution "
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
iam_api
.
rstrip
(
'/'
)
if
self
.
iam_api
else
MyStringUtils
.
EMPTY
,
"/v2/getinstancelistbyattribution"
,
)
post_body_dict
=
dict
()
if
isinstance
(
policy_data
,
str
):
post_body_dict
[
"policyData"
]
=
Utility
.
jsonstr2dict
(
policy_data
,
b_raise_error_when_illegal
=
True
)
else
:
post_body_dict
[
"policyData"
]
=
policy_data
post_body_dict
[
"matchCrn"
]
=
match_crn
if
match_crn
else
False
if
acct_id
:
post_body_dict
[
"acctID"
]
=
acct_id
if
not
instance_list
:
result
:
IamResponse
=
IamResponse
()
result
.
instance_list
=
list
()
return
result
elif
isinstance
(
instance_list
,
str
):
post_body_dict
[
"instanceList"
]
=
Utility
.
jsonstr2list
(
instance_list
)
else
:
post_body_dict
[
"instanceList"
]
=
instance_list
post_body_str
=
Utility
.
dict2jsonstr
(
post_body_dict
)
do_http_info
=
f
"请求IAM API:POST
{
apiurl_full
}
data=
{
post_body_str
}
"
resp
=
self
.
http_utils
.
do_http_post_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
data
=
post_body_str
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"post urlStr="
,
apiurl_full
,
" postbody="
,
post_body_str
,
" accessToken="
,
self
.
access_token
,
)
iam_response_dict
=
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
)
# 转换为 IamResponse
iam_response
:
IamResponse
=
self
.
gen_iam_response_from_iam_api_response
(
iam_response_dict
)
iam_response
.
instance_list
=
self
.
filter_instance_list_by_iam_api_ret_list
(
instance_list
,
iam_response
.
instance_list
,
)
return
iam_response
def
check_user_has_permission_and_get_instancelist
(
self
,
product_code
:
str
,
action_code
:
str
,
instance_list
:
Union
[
List
[
IamInstance
],
str
]
)
->
IamResponse
:
r
"""
作用: 检查用户操作是否具有权限并基于产品属性返回产品实例列表, 是上面2个API的结合,用于在网络传输速度较慢时,一次性得到返回结果。
返回的IamResponse中的有效字段是 userHasPermission 和 instance_list
:param product_code:
:param action_code:
:param instance_list:
:return:
"""
# 声明基本参数
action
=
f
"调用iam_api:post
{
self
.
iam_api
}
/v2/checkuserhaspermissionandgetinstancelist"
# 如果没有提供 accessToken 直接抛出异常
if
not
self
.
access_token
:
raise
RuntimeError
(
Utility
.
join_str
(
action
,
"未提供access_token"
))
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
iam_api
.
rstrip
(
'/'
)
if
self
.
iam_api
else
MyStringUtils
.
EMPTY
,
"/v2/checkuserhaspermissionandgetinstancelist"
,
)
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
self
.
access_token
post_body_dict
=
dict
()
post_body_dict
[
"productCode"
]
=
product_code
or
MyStringUtils
.
EMPTY
post_body_dict
[
"actionCode"
]
=
action_code
or
MyStringUtils
.
EMPTY
if
not
instance_list
:
post_body_dict
[
"instanceList"
]
=
list
()
elif
isinstance
(
instance_list
,
str
):
tmp_list
=
Utility
.
jsonstr2list
(
instance_list
)
post_body_dict
[
"instanceList"
]
=
[
{
"crn"
:
tmp_dict
.
get
(
'crn'
),
"instance_attribute"
:
tmp_dict
.
get
(
'instance_attribute'
)}
for
tmp_dict
in
tmp_list
]
else
:
post_body_dict
[
"instanceList"
]
=
[
{
"crn"
:
tmp_iam_instance
.
crn
,
"instance_attribute"
:
tmp_iam_instance
.
instance_attribute
}
for
tmp_iam_instance
in
instance_list
]
post_body_str
=
Utility
.
dict2jsonstr
(
post_body_dict
)
do_http_info
=
f
"请求IAM API:POST
{
apiurl_full
}
data=
{
post_body_str
}
"
resp
=
self
.
http_utils
.
do_http_post_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
data
=
post_body_str
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"post urlStr="
,
apiurl_full
,
" postbody="
,
post_body_str
,
" accessToken="
,
self
.
access_token
,
)
iam_response_dict
=
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
)
# 转换为 IamResponse
iam_response
:
IamResponse
=
self
.
gen_iam_response_from_iam_api_response
(
iam_response_dict
)
if
iam_response
.
user_has_permission
:
iam_response
.
instance_list
=
self
.
filter_instance_list_by_iam_api_ret_list
(
instance_list
,
iam_response
.
instance_list
,
)
return
iam_response
def
check_user_has_permission_with_instanceattribution
(
self
,
product_code
:
str
,
action_code
:
str
,
crn
:
str
,
instance_attribute
:
Union
[
List
[
Dict
],
List
[
IamInstanceAttribute
],
str
]
=
None
)
->
IamResponse
:
r
"""
作用: 检查用户对具体的某个实例是否具有权限
返回的IamResponse中的有效字段是 user_has_permission 和 no_permission_reason
:param product_code:
:param action_code:
:param crn:
:param instance_attribute:
:return:
"""
# 声明基本参数
action
=
f
"调用iam_api:get
{
self
.
iam_api
}
/v2/checkuserhaspermissionwithinstanceattribution "
# 如果没有提供 accessToken 直接抛出异常
if
not
self
.
access_token
:
raise
AppRuntimeException
(
message
=
"进行IAM鉴权时未提供 access_token"
,
detail
=
Utility
.
join_str
(
action
,
"未提供access_token"
),
)
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
iam_api
.
rstrip
(
'/'
)
if
self
.
iam_api
else
MyStringUtils
.
EMPTY
,
"/v2/checkuserhaspermissionwithinstanceattribution"
,
)
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
self
.
access_token
query_args_dict
=
dict
()
query_args_dict
[
"productCode"
]
=
product_code
or
MyStringUtils
.
EMPTY
query_args_dict
[
"actionCode"
]
=
action_code
or
MyStringUtils
.
EMPTY
query_args_dict
[
"crn"
]
=
crn
or
MyStringUtils
.
EMPTY
if
not
instance_attribute
:
query_args_dict
[
"instanceAttribute"
]
=
'[]'
elif
isinstance
(
instance_attribute
,
str
):
query_args_dict
[
"instanceAttribute"
]
=
instance_attribute
else
:
query_args_dict
[
"instanceAttribute"
]
=
Utility
.
list2jsonstr
(
instance_attribute
)
do_http_info
=
f
"请求IAM API:GET
{
apiurl_full
}
query=
{
query_args_dict
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
params
=
query_args_dict
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
"get urlStr="
,
apiurl_full
,
" params="
,
Utility
.
dict2jsonstr
(
query_args_dict
),
" accessToken="
,
self
.
access_token
,
)
iam_response_dict
=
self
.
app_utils
.
check_resp_status_code_and_code
(
action
,
detail_info
,
resp
)
# 转换为 IamResponse
return
self
.
gen_iam_response_from_iam_api_response
(
iam_response_dict
)
def
gen_iam_response_from_iam_api_response
(
self
,
iam_response_dict
:
dict
)
->
IamResponse
:
r
"""
根据iam_api返回的权限验证结果信息dict转换成IamResponse
:param iam_response_dict:
:return:
"""
result
:
IamResponse
=
IamResponse
()
tmp_value
=
iam_response_dict
.
get
(
'userHasPermission'
)
if
tmp_value
is
not
None
:
result
.
user_has_permission
=
True
if
tmp_value
else
False
else
:
result
.
user_has_permission
=
None
tmp_value
=
iam_response_dict
.
get
(
'policyData'
)
result
.
policy_data
=
tmp_value
tmp_value
=
iam_response_dict
.
get
(
'instanceList'
)
result
.
instance_list
=
tmp_value
tmp_value
=
iam_response_dict
.
get
(
'noPermissionReason'
)
result
.
no_permission_reason
=
tmp_value
return
result
def
attribute_from_dict
(
self
,
source_dict
:
dict
)
->
IamInstanceAttribute
:
r
"""
从IAM API返回的字典生成实例属性
:param source_dict:
:return:
"""
if
isinstance
(
source_dict
,
IamInstanceAttribute
):
return
source_dict
elif
isinstance
(
source_dict
,
str
):
source_dict
=
Utility
.
jsonstr2dict
(
source_dict
,
b_raise_error_when_illegal
=
True
)
result
:
IamInstanceAttribute
=
IamInstanceAttribute
()
result
.
attribute_key
=
source_dict
.
get
(
'attributeKey'
)
result
.
attribute_value
=
source_dict
.
get
(
'attributeValue'
)
def
attribute_list_from_list
(
self
,
source_list
:
list
)
->
Optional
[
List
[
IamInstanceAttribute
]]:
r
"""
从IAM API返回的字典生成实例属性
:param source_list:
:return:
"""
if
source_list
is
None
:
return
None
if
isinstance
(
source_list
,
str
):
source_list
=
Utility
.
jsonstr2list
(
source_list
)
return
[
self
.
attribute_from_dict
(
tmp_source_dict
)
for
tmp_source_dict
in
source_list
]
def
instance_from_dict
(
self
,
source_dict
:
dict
)
->
IamInstance
:
r
"""
从IAM API返回的字典生成实例
:param source_dict:
:return:
"""
if
isinstance
(
source_dict
,
IamInstance
):
return
source_dict
elif
isinstance
(
source_dict
,
str
):
source_dict
=
Utility
.
jsonstr2dict
(
source_dict
,
b_raise_error_when_illegal
=
True
)
result
:
IamInstance
=
IamInstance
()
result
.
crn
=
source_dict
.
get
(
'crn'
)
result
.
instance_attribute
=
self
.
attribute_list_from_list
(
source_dict
.
get
(
'instanceAttribute'
))
return
result
def
instance_list_from_list
(
self
,
source_list
:
list
)
->
Optional
[
List
[
IamInstance
]]:
r
"""
从IAM API返回的字典生成实例
:param source_list:
:return:
"""
if
source_list
is
None
:
return
None
if
isinstance
(
source_list
,
str
):
source_list
=
Utility
.
jsonstr2list
(
source_list
)
return
[
self
.
instance_from_dict
(
tmp_source_dict
)
for
tmp_source_dict
in
source_list
]
def
filter_instance_list_by_iam_api_ret_list
(
self
,
ori_instance_list
:
list
,
iam_api_ret_list
:
list
)
->
Optional
[
List
[
IamInstance
]]:
r
"""
使用iam api返回的实例列表(包含crn属性)过滤原始的实例列表
:param ori_instance_list
:param iam_api_ret_list:
:return:
"""
if
ori_instance_list
is
None
:
return
None
if
not
iam_api_ret_list
:
return
ori_instance_list
if
isinstance
(
ori_instance_list
,
str
):
ori_instance_list
=
Utility
.
jsonstr2list
(
ori_instance_list
)
if
isinstance
(
iam_api_ret_list
,
str
):
iam_api_ret_list
=
Utility
.
jsonstr2list
(
iam_api_ret_list
)
iam_api_ret_list
=
self
.
instance_list_from_list
(
iam_api_ret_list
)
def
is_matched_crn
(
tmp_ori_instance
):
tmp_ori_instance
=
self
.
instance_from_dict
(
tmp_ori_instance
)
for
tmp_iam_api_ret_instance
in
iam_api_ret_list
:
if
tmp_iam_api_ret_instance
.
crn
==
tmp_ori_instance
.
crn
:
return
True
return
False
tmp_list
=
filter
(
is_matched_crn
,
ori_instance_list
)
return
list
(
tmp_list
)
cucc_common_pkg/ludq_utils/utils_tg_sso/__init__.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
cucc_common_pkg/ludq_utils/utils_tg_sso/sso_utils.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
Optional
from
flask
import
Request
,
current_app
from
..app_exception
import
AppRuntimeException
from
..app_response
import
AppResponse
from
..base_const
import
ConstResponseCode
from
..common_app_config
import
CommonAppConfig
from
..logined_user
import
LoginedUserInfo
from
..utils_base
import
UtilityBaseV2
from
..utils_http.http_cookie
import
HttpCookie
from
..utils_http.http_utils
import
HttpUtils
from
...globalutility
import
Utility
from
...my_stringutils
import
MyStringUtils
class
UserInfoFromRequest
:
user_info
:
Optional
[
LoginedUserInfo
]
=
None
http_cookie
:
Optional
[
HttpCookie
]
=
None
auth_failure_reason
:
str
=
None
def
__init__
(
self
,
**
kwargs
):
self
.
user_info
=
kwargs
.
get
(
"user_info"
)
self
.
http_cookie
=
kwargs
.
get
(
"http_cookie"
)
self
.
auth_failure_reason
=
kwargs
.
get
(
"auth_failure_reason"
)
class
SsoUtils
:
r
"""
SSO API封装功能类,依赖HttpUtils类,依赖HttpUtils等一些基础应用类
"""
sso_api
:
str
=
None
app_utils
:
UtilityBaseV2
=
None
http_utils
:
HttpUtils
=
None
incoming_request
=
None
def
__init__
(
self
,
**
kwargs
):
r
"""
初始化
:key sso_api: 示例 https://sso.tg.unicom.local/sso
:key app_utils
:key http_utils
:key incoming_request_headers: 透传的 incoming_request.headers
:key incoming_request
:key logger: 用于打印日志
"""
sso_api
=
kwargs
.
get
(
"sso_api"
)
app_utils
=
kwargs
.
get
(
"app_utils"
)
http_utils
=
kwargs
.
get
(
"http_utils"
)
incoming_request
=
kwargs
.
get
(
"incoming_request"
)
if
not
sso_api
:
raise
ValueError
(
"初始化 SsoUtils 失败,未提供 sso_api"
)
if
not
app_utils
:
app_utils
=
UtilityBaseV2
(
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
)
self
.
sso_api
=
sso_api
self
.
app_utils
=
app_utils
self
.
http_utils
=
http_utils
self
.
incoming_request
=
incoming_request
if
not
http_utils
:
self
.
http_utils
=
HttpUtils
(
incoming_request
=
incoming_request
,
incoming_request_headers
=
kwargs
.
get
(
"incoming_request_headers"
),
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
def
get_user_info_from_request
(
self
,
request
:
Request
,
)
->
UserInfoFromRequest
:
r
"""
获取用户信息,未登录或验证登录时发生异常时,返回的 UserInfoFromRequest中的 user_info 为 None,
正常验证通过后, 返回的 UserInfoFromRequest中的 user_info , 以及可能的http_cookie
:param request:
:return:
"""
try
:
# 从cookie中获取 accessToken 列表
access_token_list
:
list
=
self
.
get_access_token_from_request
(
request
)
# 从Bearer头中获取 bearer字符串
bearer_string
:
str
=
self
.
app_utils
.
get_bearer_str_from_request
(
request
)
# 如果没有提供accessToken和 Bearer 直接返回 None
if
not
access_token_list
and
not
bearer_string
:
return
UserInfoFromRequest
(
user_info
=
None
,
http_cookie
=
None
,
auth_failure_reason
=
"request中没有accessToken"
,
)
# 先尝试使用 accessToken 获取用户信息
userinfo_from_request
=
self
.
get_user_info_from_access_token
(
access_token_list
)
user_info
,
_
,
_
=
userinfo_from_request
.
user_info
,
userinfo_from_request
.
http_cookie
,
userinfo_from_request
.
auth_failure_reason
# 如果没有获取到用户信息, 再尝试使用 bearer_string 获取
if
not
user_info
and
bearer_string
:
userinfo_from_request
=
self
.
get_user_info_from_access_token
([
bearer_string
])
# 返回
return
userinfo_from_request
except
BaseException
as
e
:
# 记录访问结束日志
self
.
logger
.
error
(
"获取用户信息时发生异常"
)
# 打印异常堆栈信息
current_app
.
logger
.
error
(
Utility
.
join_str
(
'获取用户信息时发生异常'
))
current_app
.
logger
.
exception
(
e
,
exc_info
=
True
)
# 返回
return
UserInfoFromRequest
(
user_info
=
None
,
http_cookie
=
None
,
auth_failure_reason
=
f
"获取用户信息时发生异常:
{
e
}
"
)
def
get_access_token_from_request
(
self
,
request
)
->
list
:
r
"""
从request的Header Cookie中获取accessToken列表
:param request:
:return:
"""
cookies_map
=
self
.
app_utils
.
get_cookies_dict_from_request
(
request
,
decoding
=
False
)
return
cookies_map
.
get
(
'accessToken'
)
def
get_user_info_from_access_token
(
self
,
access_token_list
:
list
,
)
->
UserInfoFromRequest
:
r
"""
通过access_token_list获取用户信息,返回用户信息以及可能的http_cookie
:param access_token_list:
:return:
"""
# 声明基本参数
action
=
"调用ssoServer查询登陆用户信息(使用accessToken) "
# 如果没有提供 accessToken 直接返回 None
if
not
access_token_list
:
return
UserInfoFromRequest
(
user_info
=
None
,
http_cookie
=
None
,
auth_failure_reason
=
"request中没有accessToken"
)
# 预先声明获取到的用户信息json
user_info_dict
=
None
http_cookie
=
None
access_token
=
None
# 声明统一使用的变量
apiurl_full
=
Utility
.
join_str
(
self
.
sso_api
.
rstrip
(
'/'
)
if
self
.
sso_api
else
MyStringUtils
.
EMPTY
,
"/v1/users/info"
,
)
for
tmp_access_token
in
access_token_list
:
map_cookie
=
dict
()
map_cookie
[
'accessToken'
]
=
tmp_access_token
do_http_info
=
f
"请求SSO API:GET
{
apiurl_full
}
, cookies=
{
map_cookie
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
apiurl_full
,
verify
=
False
,
cookies
=
map_cookie
,
)
# 判断状态码等内容
detail_info
=
Utility
.
join_str
(
f
"请求SSO(get
{
apiurl_full
}
)"
,
)
resp_dict
=
self
.
app_utils
.
check_resp_status_code_and_content
(
action
,
detail_info
,
resp
)
# 转换成标准的 AppResponse
app_resonse
=
AppResponse
.
from_dict
(
resp_dict
)
if
ConstResponseCode
.
CODE_OK
==
app_resonse
.
code
:
# 如果返回的状态码为 OK ,则提取用户信息
user_info_dict
=
app_resonse
.
data
access_token
=
tmp_access_token
# 接收 response 中的 Cookie
for
tmp_cookie
in
resp
.
cookies
:
if
MyStringUtils
.
equals
(
'accesstoken'
,
tmp_cookie
.
name
.
lower
()):
http_cookie
=
HttpCookie
(
cookie_key
=
tmp_cookie
.
name
,
cookie_value
=
tmp_cookie
.
value
,
cookie_path
=
tmp_cookie
.
path
,
cookie_domain
=
tmp_cookie
.
domain
,
)
access_token
=
tmp_cookie
.
value
break
break
# 如果未获取到用户信息,直接返回None
if
not
user_info_dict
:
return
UserInfoFromRequest
(
user_info
=
None
,
http_cookie
=
None
)
# 转换为logindUserInfo
return
UserInfoFromRequest
(
user_info
=
self
.
gen_user_info_from_sso_api_response
(
action
,
user_info_dict
,
access_token
=
access_token
),
http_cookie
=
http_cookie
)
def
gen_user_info_from_sso_api_response
(
self
,
action
:
str
,
user_info_dict
:
dict
,
access_token
:
str
=
None
,
)
->
LoginedUserInfo
:
r
"""
根据sso_api返回的用户信息dict转换成LoginedUserInfo
:param action
:param user_info_dict:
:param access_token:
:return:
"""
result
:
LoginedUserInfo
=
LoginedUserInfo
()
tmp_value
=
user_info_dict
.
get
(
'accountID'
)
if
not
tmp_value
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败"
),
Utility
.
join_str
(
action
,
"返回的用户内容不包括accountID,userInfo="
,
Utility
.
dict2jsonstr
(
user_info_dict
),
)
)
result
.
account_id
=
tmp_value
tmp_value
=
user_info_dict
.
get
(
'userID'
)
if
not
tmp_value
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败"
),
Utility
.
join_str
(
action
,
"返回的用户内容不包括userID,userInfo="
,
Utility
.
dict2jsonstr
(
user_info_dict
),
)
)
result
.
user_id
=
tmp_value
tmp_value
=
user_info_dict
.
get
(
'userName'
)
if
not
tmp_value
:
raise
AppRuntimeException
(
Utility
.
join_str
(
action
,
"失败"
),
Utility
.
join_str
(
action
,
"返回的用户内容不包括userName,userInfo="
,
Utility
.
dict2jsonstr
(
user_info_dict
),
)
)
result
.
user_name
=
tmp_value
result
.
account_name
=
user_info_dict
.
get
(
'accountName'
)
result
.
mobile
=
user_info_dict
.
get
(
'mobile'
)
result
.
email
=
user_info_dict
.
get
(
'email'
)
result
.
is_root
=
user_info_dict
.
get
(
'isRoot'
)
result
.
access_token
=
access_token
return
result
cucc_common_pkg/ludq_utils/utils_tg_vpc/__init__.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
cucc_common_pkg/ludq_utils/utils_tg_vpc/vpc_utils.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
List
,
Dict
from
..app_exception
import
AppRuntimeException
from
..common_app_config
import
CommonAppConfig
from
..utils_base
import
UtilityBaseV2
from
..utils_http.http_utils
import
HttpUtils
class
VpcUtils
:
r
"""
VPC API封装功能类,依赖HttpUtils类,依赖HttpUtils等一些基础应用类
"""
vpc_api
:
str
=
None
vpc_api_for_get_vpc_ip
:
List
[
str
]
=
None
app_utils
:
UtilityBaseV2
=
None
http_utils
:
HttpUtils
=
None
incoming_request
=
None
def
__init__
(
self
,
**
kwargs
):
r
"""
初始化
:key vpc_api: 示例 https://vpc.console.tg.unicom.local
:key vpc_api_for_get_vpc_ip: 示例 http://10.124.142.70:65334 或 https://vpc.console.tg.unicom.local
:key app_utils
:key http_utils
:key incoming_request_headers: 透传的 incoming_request.headers
:key incoming_request
:key logger: 用于打印日志
"""
vpc_api
=
kwargs
.
get
(
"vpc_api"
)
vpc_api_for_get_vpc_ip
=
kwargs
.
get
(
"vpc_api_for_get_vpc_ip"
)
app_utils
=
kwargs
.
get
(
"app_utils"
)
http_utils
=
kwargs
.
get
(
"http_utils"
)
incoming_request
=
kwargs
.
get
(
"incoming_request"
)
if
not
app_utils
:
app_utils
=
UtilityBaseV2
(
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
)
self
.
vpc_api
=
vpc_api
if
isinstance
(
vpc_api_for_get_vpc_ip
,
str
)
and
vpc_api_for_get_vpc_ip
:
self
.
vpc_api_for_get_vpc_ip
=
[
vpc_api_for_get_vpc_ip
]
elif
isinstance
(
vpc_api_for_get_vpc_ip
,
list
):
self
.
vpc_api_for_get_vpc_ip
=
vpc_api_for_get_vpc_ip
else
:
self
.
vpc_api_for_get_vpc_ip
=
list
()
self
.
vpc_api_for_get_vpc_ip
=
[
x
for
x
in
self
.
vpc_api_for_get_vpc_ip
if
x
!=
""
]
self
.
app_utils
=
app_utils
self
.
http_utils
=
http_utils
self
.
incoming_request
=
incoming_request
if
not
http_utils
:
self
.
http_utils
=
HttpUtils
(
incoming_request
=
incoming_request
,
incoming_request_headers
=
kwargs
.
get
(
"incoming_request_headers"
),
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
,
)
self
.
logger
=
kwargs
.
get
(
"logger"
)
or
CommonAppConfig
().
common_logger
def
get_vpc_ip
(
self
,
api_version
:
str
,
vpc_name
:
str
,
clb_area
:
str
=
None
,
region_code
:
str
=
None
,
cookies
=
None
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc_ip地址
:param api_version: api版本,支持 v1,v2,v3,v4
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , v1版本可不提供,默认为 access ,后续版本必须提供
:param region_code: 区域, v2版本使用,
:param cookies: 通过 Flask.request.cookies获取到的Cookies字典即可, v2版本使用
"""
if
api_version
==
"v1"
:
return
self
.
_get_vpc_ip_v1
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
,
)
elif
api_version
==
"v2"
:
return
self
.
_get_vpc_ip_v2
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
,
region_code
=
region_code
,
cookies
=
cookies
,
)
elif
api_version
==
"v3"
:
return
self
.
_get_vpc_ip_v3
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
or
"access"
,
)
elif
api_version
==
"v4"
:
return
self
.
_get_vpc_ip_v4
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
or
"access"
,
)
else
:
raise
ValueError
(
f
"获取vpc vip时接收了不支持的版本:
{
api_version
}
"
)
def
_get_vpc_ip_v1
(
self
,
vpc_name
:
str
,
clb_area
:
str
=
None
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc_ip地址, v1版本
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , v1版本可不提供,默认为 access
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpcip v1时未提供vpc api域名或IP地址和端口"
)
api_path
=
"/v1/spider/lb/vip/get"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
params
=
{
"vpcTenant"
:
vpc_name
}
if
clb_area
:
params
[
"clbArea"
]
=
clb_area
do_http_info
=
f
"获取vpcip v1 api,get url:
{
request_url
}
,query:
{
self
.
app_utils
.
dict2jsonstr
(
params
)
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
params
=
params
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v1时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
status
=
resp_dict
.
get
(
"status"
)
if
status
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v1时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回ip
return
resp_dict
.
get
(
"value"
)
raise
AppRuntimeException
(
message
=
f
'获取vpcip v1时执行到异常位置'
,
detail
=
f
'获取vpcip v1时执行到异常位置'
,
)
def
_get_vpc_ip_v2
(
self
,
vpc_name
:
str
,
clb_area
:
str
,
region_code
:
str
,
cookies
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc_ip地址
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , 必须提供
:param region_code: 区域编码,
:param cookies: 通过 Flask.request.cookies获取到的Cookies字典即可,
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpcip v2时未提供vpc api域名或IP地址和端口"
)
if
not
clb_area
:
raise
ValueError
(
"获取vpcip v2时没有提供clb_area"
)
if
not
region_code
:
raise
ValueError
(
"获取vpcip v2时没有提供region_code"
)
if
not
cookies
:
raise
ValueError
(
"获取vpcip v2时没有提供cookies"
)
api_path
=
"/v2/apiserver/spider/lb/vip/get"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
params
=
{
"vpcTenant"
:
vpc_name
,
"clbArea"
:
clb_area
,
"regionId"
:
region_code
}
do_http_info
=
f
"获取vpcip v2 api,get url:
{
request_url
}
,query:
{
self
.
app_utils
.
dict2jsonstr
(
params
)
}
,cookies:
{
cookies
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
params
=
params
,
cookies
=
cookies
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v2时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
status
=
resp_dict
.
get
(
"status"
)
if
status
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v2时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回ip
return
resp_dict
.
get
(
"value"
)
raise
AppRuntimeException
(
message
=
f
'获取vpcip v2时执行到异常位置'
,
detail
=
f
'获取vpcip v2时执行到异常位置'
,
)
def
_get_vpc_ip_v3
(
self
,
vpc_name
:
str
,
clb_area
:
str
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc_ip地址, v3版本,对应于目标云获取vip(不验证登录的方式)
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , 必须提供
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpcip v3时未提供vpc api域名或IP地址和端口"
)
if
not
clb_area
:
raise
ValueError
(
"获取vpcip v3时没有提供clb_area"
)
api_path
=
f
"/v1/clb/
{
vpc_name
}
/
{
clb_area
}
/"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
do_http_info
=
f
"获取vpcip v3 api,get url:
{
request_url
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v3时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
error
=
resp_dict
.
get
(
"error"
)
if
error
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v3时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回ip
return
resp_dict
.
get
(
"vip"
)
raise
AppRuntimeException
(
message
=
f
'获取vpcip v3时执行到异常位置'
,
detail
=
f
'获取vpcip v3时执行到异常位置'
,
)
def
_get_vpc_ip_v4
(
self
,
vpc_name
:
str
,
clb_area
:
str
=
None
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc_ip地址, v4版本,对应于contiv、ovn并行环境(不验证登录)
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access ,必须提供
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpcip v4时未提供vpc api域名或IP地址和端口"
)
if
not
clb_area
:
raise
ValueError
(
"获取vpcip v4时没有提供clb_area"
)
api_path
=
"/v1/spider/listen/vip/get"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
params
=
{
"vpc"
:
vpc_name
,
"clbArea"
:
clb_area
}
do_http_info
=
f
"获取vpcip v4 api,get url:
{
request_url
}
,query:
{
self
.
app_utils
.
dict2jsonstr
(
params
)
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
params
=
params
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v4时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
status
:
bool
=
resp_dict
.
get
(
"status"
)
if
status
is
not
True
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpcip v4时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回ip
return
resp_dict
.
get
(
"value"
)
raise
AppRuntimeException
(
message
=
f
'获取vpcip v4时执行到异常位置'
,
detail
=
f
'获取vpcip v4时执行到异常位置'
,
)
def
get_all_vpc_ip
(
self
,
api_version
:
str
,
clb_area
:
str
=
None
,
)
->
List
[
Dict
[
str
,
str
]]:
"""
获取所有vpc_ip地址,
返回的每个对象为字典,{"alias":"","vip":"","Vpcname":""}
:param api_version: api版本,支持 v3
:param clb_area: 区域,示例 access
"""
if
api_version
==
"v3"
:
return
self
.
_get_all_vpc_ip_v3
(
clb_area
=
clb_area
,
)
else
:
raise
ValueError
(
f
"获取all_vpcip时接收了不支持的版本:
{
api_version
}
"
)
def
_get_all_vpc_ip_v3
(
self
,
clb_area
:
str
=
None
,
)
->
List
[
Dict
[
str
,
str
]]:
"""
获取所有vpc_ip地址, v3版本,对应于目标云获取vip(不验证登录的方式),
返回的每个对象为字典,{"alias":"","vip":"","Vpcname":""}
:param clb_area: 区域,示例 access , v1版本可不提供,默认为 access
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取all_vpcip v3时未提供vpc api域名或IP地址和端口"
)
if
not
clb_area
:
raise
ValueError
(
"获取all_vpcip v3时没有提供clb_area"
)
api_path
=
f
"/v1/allclb/
{
clb_area
}
/"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
do_http_info
=
f
"获取vpcip v3 api,get url:
{
request_url
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取all_vpcip v3时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
error
=
resp_dict
.
get
(
"error"
)
if
error
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取all_vpcip v3时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回对象字典
return
resp_dict
.
get
(
"clbs"
)
raise
AppRuntimeException
(
message
=
f
'获取all_vpcip v3时执行到异常位置'
,
detail
=
f
'获取all_vpcip v3时执行到异常位置'
,
)
def
get_vpc_used_ports
(
self
,
api_version
:
str
,
vpc_name
:
str
,
clb_area
:
str
=
None
,
region_code
:
str
=
None
,
cookies
=
None
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc已用端口
:param api_version: api版本,支持 v1,v2,v3,v4
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , v1版本可不提供,默认为 access ,后续版本必须提供
:param region_code: 区域, v2版本使用,
:param cookies: 通过 Flask.request.cookies获取到的Cookies字典即可, v2版本使用
"""
if
api_version
==
"v1"
:
return
self
.
_get_vpc_used_ports_v1
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
,
)
elif
api_version
==
"v2"
:
return
self
.
_get_vpc_used_ports_v2
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
,
region_code
=
region_code
,
cookies
=
cookies
,
)
elif
api_version
==
"v3"
:
return
self
.
_get_vpc_used_ports_v3
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
or
"access"
,
)
elif
api_version
==
"v4"
:
return
self
.
_get_vpc_used_ports_v4
(
vpc_name
=
vpc_name
,
clb_area
=
clb_area
or
"access"
,
)
else
:
raise
ValueError
(
f
"获取vpc已用端口时接收了不支持的版本:
{
api_version
}
"
)
def
_get_vpc_used_ports_v1
(
self
,
vpc_name
:
str
,
clb_area
:
str
=
None
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc已用端口, v1版本
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , v1版本可不提供,默认为 access
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpc已用端口v1时未提供vpc api域名或IP地址和端口"
)
api_path
=
"/v1/spider/lb/vport/get"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
params
=
{
"vpcTenant"
:
vpc_name
}
if
clb_area
:
params
[
"clbArea"
]
=
clb_area
do_http_info
=
f
"获取vpc已用端口 v1 api,get url:
{
request_url
}
,query:
{
self
.
app_utils
.
dict2jsonstr
(
params
)
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
params
=
params
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v1时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
status
=
resp_dict
.
get
(
"status"
)
if
status
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v1时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回已用端口列表
is_use_vport_list
=
resp_dict
.
get
(
"value"
)
or
list
()
is_use_vport_list
.
sort
()
return
is_use_vport_list
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v1时执行到异常位置'
,
detail
=
f
'获取vpc已用端口v1时执行到异常位置'
,
)
def
_get_vpc_used_ports_v2
(
self
,
vpc_name
:
str
,
clb_area
:
str
,
region_code
:
str
,
cookies
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc已用端口,v2版本
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access ,必须提供
:param region_code: 区域,
:param cookies: 通过 Flask.request.cookies获取到的Cookies字典即可,
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpc已用端口v2时未提供vpc api域名或IP地址和端口"
)
if
not
clb_area
:
raise
ValueError
(
"获取vpc已用端口v2时没有提供clb_area"
)
if
not
region_code
:
raise
ValueError
(
"获取vpc已用端口v2时没有提供region_code"
)
if
not
cookies
:
raise
ValueError
(
"获取vpc已用端口v2时没有提供cookies"
)
api_path
=
"/v2/apiserver/spider/lb/vport/get"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
params
=
{
"vpcTenant"
:
vpc_name
,
"clbArea"
:
clb_area
,
"regionId"
:
region_code
}
do_http_info
=
f
"获取vpc已用端口 v2 api,get url:
{
request_url
}
,query:
{
self
.
app_utils
.
dict2jsonstr
(
params
)
}
,cookies:
{
cookies
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
params
=
params
,
cookies
=
cookies
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v2时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
status
=
resp_dict
.
get
(
"status"
)
if
status
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v2时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回已用端口列表
is_use_vport_list
=
resp_dict
.
get
(
"value"
)
or
list
()
is_use_vport_list
.
sort
()
return
is_use_vport_list
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v2时执行到异常位置'
,
detail
=
f
'获取vpc已用端口v2时执行到异常位置'
,
)
def
_get_vpc_used_ports_v3
(
self
,
vpc_name
:
str
,
clb_area
:
str
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc已用端口,v3版本,对应于目标云获取vip(不验证登录的方式)
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access ,必须提供
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpc已用端口v3时未提供vpc api域名或IP地址和端口"
)
if
not
clb_area
:
raise
ValueError
(
"获取vpc已用端口v3时没有提供clb_area"
)
api_path
=
f
"/v1/clbPort/
{
vpc_name
}
/
{
clb_area
}
/"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
do_http_info
=
f
"获取vpc已用端口 v3 api,get url:
{
request_url
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v3时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
error
=
resp_dict
.
get
(
"error"
)
if
error
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v3时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回已用端口列表
is_use_vport_list
=
resp_dict
.
get
(
"ports"
)
or
list
()
is_use_vport_list
.
sort
()
return
is_use_vport_list
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v3时执行到异常位置'
,
detail
=
f
'获取vpc已用端口v3时执行到异常位置'
,
)
def
_get_vpc_used_ports_v4
(
self
,
vpc_name
:
str
,
clb_area
:
str
=
None
,
)
->
List
[
str
]:
"""
通过vpc_name获取vpc已用端口,v3版本,对应于contiv、ovn并行环境(不验证登录)
:param vpc_name: vpc名称
:param clb_area: 区域,示例 access , 必须提供
"""
if
not
self
.
vpc_api_for_get_vpc_ip
:
raise
ValueError
(
"获取vpc已用端口v4时未提供vpc api域名或IP地址和端口"
)
api_path
=
"/v1/spider/listen/vport/get"
len_vpc_hosts
=
len
(
self
.
vpc_api_for_get_vpc_ip
)
max_index
=
len_vpc_hosts
-
1
for
tmp_index
in
range
(
len_vpc_hosts
):
tmp_vpc_api_for_get_vpc_ip
=
self
.
vpc_api_for_get_vpc_ip
[
tmp_index
]
request_url
=
f
"
{
tmp_vpc_api_for_get_vpc_ip
}{
api_path
}
"
params
=
{
"vpc"
:
vpc_name
,
"clbarea"
:
clb_area
}
do_http_info
=
f
"获取vpc已用端口 v4 api,get url:
{
request_url
}
,query:
{
self
.
app_utils
.
dict2jsonstr
(
params
)
}
"
resp
=
self
.
http_utils
.
do_http_get_with_log_time
(
do_http_info
=
do_http_info
,
url
=
request_url
,
params
=
params
,
verify
=
False
,
)
if
resp
.
status_code
!=
200
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v4时api返回状态码为
{
resp
.
status_code
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
resp_dict
=
self
.
app_utils
.
jsonstr2dict
(
resp
.
text
)
status
:
bool
=
resp_dict
.
get
(
"status"
)
if
status
is
not
True
:
if
tmp_index
<
max_index
:
continue
else
:
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v4时api返回内容的为
{
resp
.
text
}
'
,
detail
=
f
"
{
do_http_info
}
"
)
# 处理数据,返回已用端口列表
is_use_vport_list
=
resp_dict
.
get
(
"value"
)
or
list
()
is_use_vport_list
.
sort
()
return
is_use_vport_list
raise
AppRuntimeException
(
message
=
f
'获取vpc已用端口v4时执行到异常位置'
,
detail
=
f
'获取vpc已用端口v4时执行到异常位置'
,
)
cucc_common_pkg/ludq_utils/utils_v2.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
from
typing
import
List
from
.utils_base
import
UtilityBaseV2
from
.utils_mesh_mng.cluster_istio
import
ClusterIstio
,
EnumClusterSource
from
.utils_mesh_mng.csm_instance
import
CsmInstance
from
.utils_mesh_mng.csm_instance_cluster
import
CsmInstanceCluster
try
:
import
thread
except
ImportError
:
import
_thread
as
thread
class
UtilityV2
(
UtilityBaseV2
):
r
"""
便捷功能类,不依赖任何类的应用的便捷功能类
"""
def
gen_all_cluster_istios
(
self
,
csm_instance
:
CsmInstance
)
->
List
[
ClusterIstio
]:
r
"""
生成实例的所有cluster_istios列表
:param csm_instance:
"""
result_list
=
list
()
result_list
.
append
(
csm_instance
.
cluster_istio
)
if
csm_instance
.
other_clusters
:
result_list
.
extend
(
[
tmp_instance_cluster
.
cluster_istio
for
tmp_instance_cluster
in
csm_instance
.
other_clusters
]
)
return
result_list
def
is_instance_contain_platform_cluster
(
self
,
csm_instance
:
CsmInstance
,
only_judge_main_cluster
:
bool
=
False
,
only_judge_instance_clusters
:
bool
=
False
,
)
->
bool
:
r
"""
判断csm实例是否包含集群提供的集群
:param csm_instance:
:param only_judge_instance_clusters:
:param only_judge_main_cluster:
"""
if
only_judge_main_cluster
:
if
csm_instance
.
cluster_istio
.
cluster_source
==
EnumClusterSource
.
PLATFORM
.
value
:
return
True
return
False
elif
only_judge_instance_clusters
:
all_cluster_istios
=
list
()
if
csm_instance
.
other_clusters
:
all_cluster_istios
.
extend
(
[
tmp_instance_cluster
.
cluster_istio
for
tmp_instance_cluster
in
csm_instance
.
other_clusters
]
)
for
tmp_cluster_istio
in
all_cluster_istios
:
if
tmp_cluster_istio
.
cluster_source
==
EnumClusterSource
.
PLATFORM
.
value
:
return
True
return
False
else
:
all_cluster_istios
=
self
.
gen_all_cluster_istios
(
csm_instance
)
for
tmp_cluster_istio
in
all_cluster_istios
:
if
tmp_cluster_istio
.
cluster_source
==
EnumClusterSource
.
PLATFORM
.
value
:
return
True
return
False
def
is_instance_cluster_contain_platform_cluster
(
self
,
instance_cluster
:
CsmInstanceCluster
)
->
bool
:
r
"""
判断instance_cluster是否包含集群提供的集群
:param instance_cluster:
"""
if
instance_cluster
.
cluster_istio
.
cluster_source
==
EnumClusterSource
.
PLATFORM
.
value
:
return
True
return
False
cucc_common_pkg/ludq_utils/utils_webservice/__init__.py
0 → 100644
View file @
11b2601e
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
"""
@Version: 1.0
@Python Version:3.6.6
@Author: ludq1
@Email: ludq1@chinaunicom.cn
@date: 2023/04/07 11:40:00
@Description:
"""
Prev
1
2
3
4
5
6
7
8
Next
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment