#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import os import shutil import subprocess import tempfile from typing import Tuple from ..globalutility import Utility class GenCertResult: root_key_content: str = None root_key_filepath: str = None root_cert_content: str = None root_cert_filepath: str = None ca_key_content: str = None ca_key_filepath: str = None ca_cert_content: str = None ca_cert_filepath: str = None cert_chain_content: str = None def __init__(self, **kwargs): r""" :param kwargs: :key root_key_content: :key root_key_filepath: :key root_cert_content: :key root_cert_filepath: :key ca_key_content: :key ca_key_filepath: :key ca_cert_content: :key ca_cert_filepath: :key cert_chain_content: """ self.root_key_content = kwargs.get("root_key_content") self.root_key_filepath = kwargs.get("root_key_filepath") self.root_cert_content = kwargs.get("root_cert_content") self.root_cert_filepath = kwargs.get("root_cert_filepath") self.ca_key_content = kwargs.get("ca_key_content") self.ca_key_filepath = kwargs.get("ca_key_filepath") self.ca_cert_content = kwargs.get("ca_cert_content") self.ca_cert_filepath = kwargs.get("ca_cert_filepath") self.cert_chain_content = kwargs.get("cert_chain_content") def __str__(self): return Utility.dict2jsonstr(self.__dict__) class CertUtils: r""" 证书相关的Utils类 """ def gen_root_key_and_cert_and_cluster_key_and_cert( self, keysize: int = 4096, content_format: str = "pem", **kwargs ) -> GenCertResult: r""" 一次性生成根证书key和cert文件内容和生成中间证书key和cert文件内容 :param keysize: 默认值 4096 :param content_format: 默认值 pem :param kwargs: :key rootca_org: 默认值 Istio :key rootca_cn: 默认值 Root CA :key rootca_days: 默认值 36500 :key intermediate_cluster_name: 必须提供, :key dns_names: 默认值 [istiod.istio-system.svc] :key ip_addresses: 默认值 [] :key intermediate_org: 默认值 Istio :key intermediate_cn: 默认值 Intermediate CA :key intermediate_days: 默认值 36500 :key dest_dir: 生成的文件存放目录, 不提供时使用临时目录, :key del_dir: 生成文件完成后,是否删除文件夹, 默认值 True """ dest_dir = kwargs.get("dest_dir") or None if not dest_dir: dest_dir = tempfile.mkdtemp() del_dir = kwargs.get("del_dir") if del_dir is None: del_dir = True kwargs["dest_dir"] = dest_dir kwargs["del_dir"] = False # 首先生成 root-key.pem 和 root-cert.pem result1 = self.gen_root_key_and_cert(keysize=keysize, content_format=content_format, **kwargs) # 然后生成 ca-key.pem 和 ca-cert.pem result2 = self.gen_cluster_key_and_cert( keysize=keysize, content_format=content_format, root_key=result1.root_key_filepath, root_cert=result1.root_cert_filepath, **kwargs ) if del_dir: shutil.rmtree(dest_dir, ignore_errors=True) return result2 def gen_root_key_and_cert(self, keysize: int = 4096, content_format: str = "pem", **kwargs) -> GenCertResult: r""" 一次性生成根证书key和cert文件内容 :param keysize: 默认值 4096 :param content_format: 默认值 pem :param kwargs: :key rootca_org: 默认值 Istio :key rootca_cn: 默认值 Root CA :key rootca_days: 默认值 36500 :key dest_dir: 生成的文件存放目录, 不提供时使用临时目录, :key del_dir: 生成文件完成后,是否删除文件夹, 默认值 True """ dest_dir = kwargs.get("dest_dir") or None if not dest_dir: dest_dir = tempfile.mkdtemp() del_dir = kwargs.get("del_dir") if del_dir is None: del_dir = True kwargs["dest_dir"] = dest_dir kwargs["del_dir"] = False # 首先生成 root-key.pem root_key_content, root_key_filepath = self.gen_key( keysize=keysize, content_format=content_format, dest_dir=dest_dir, del_dir=False, key_file_name="root-key", ) # 然后生成 root-cert.pem root_cert_content, root_cert_filepath = self.gen_root_cert( root_key=root_key_filepath, keysize=keysize, content_format=content_format, **kwargs ) if del_dir: shutil.rmtree(dest_dir, ignore_errors=True) return GenCertResult( root_key_content=root_key_content, root_key_filepath=root_key_filepath, root_cert_content=root_cert_content, root_cert_filepath=root_cert_filepath, ) def gen_cluster_key_and_cert( self, root_key: str, root_cert: str, keysize: int = 4096, content_format: str = "pem", **kwargs ) -> GenCertResult: r""" 一次性生成中间证书key和cert文件内容 :param root_key: root_key文件内容或文件路径 :param root_cert: root_cert文件内容或文件路径 :param keysize: 默认值 4096 :param content_format: 默认值 pem :key intermediate_cluster_name: 必须提供, :key dns_names: 默认值 [istiod.istio-system.svc] :key ip_addresses: 默认值 [] :key intermediate_org: 默认值 Istio :key intermediate_cn: 默认值 Intermediate CA :key intermediate_days: 默认值 36500 :key dest_dir: 生成的文件存放目录, 不提供时使用临时目录, :key del_dir: 生成文件完成后,是否删除文件夹, 默认值 True """ dest_dir = kwargs.get("dest_dir") or None if not dest_dir: dest_dir = tempfile.mkdtemp() del_dir = kwargs.get("del_dir") if del_dir is None: del_dir = True kwargs["dest_dir"] = dest_dir kwargs["del_dir"] = False # 首先生成 ca-key.pem ca_key_content, ca_key_filepath = self.gen_key( keysize=keysize, content_format=content_format, dest_dir=dest_dir, del_dir=False, key_file_name="ca-key", ) # 然后生成 ca-cert.pem ca_cert_content, ca_cert_filepath = self.gen_intermediate_cert( root_key=root_key, root_cert=root_cert, ca_key=ca_key_filepath, keysize=keysize, content_format=content_format, **kwargs ) # 读取 root_key 内容 if os.path.exists(root_key) and os.path.isfile(root_key): root_key_filepath = root_key with open(root_key, 'r', encoding='utf-8') as f: root_key_content = f.read() else: root_key_filepath = os.sep.join([dest_dir, "root-key.pem"]) root_key_content = root_key # 读取 root_cert 内容 if os.path.exists(root_cert) and os.path.isfile(root_cert): root_cert_filepath = root_cert with open(root_cert, 'r', encoding='utf-8') as f: root_cert_content = f.read() else: root_cert_filepath = os.sep.join([dest_dir, "root-cert.pem"]) root_cert_content = root_cert if del_dir: shutil.rmtree(dest_dir, ignore_errors=True) # 生成 cert_chain_content cert_chain_content = f"{ca_cert_content}{root_cert_content}" return GenCertResult( root_key_content=root_key_content, root_key_filepath=root_key_filepath, root_cert_content=root_cert_content, root_cert_filepath=root_cert_filepath, ca_key_content=ca_key_content, ca_key_filepath=ca_key_filepath, ca_cert_content=ca_cert_content, ca_cert_filepath=ca_cert_filepath, cert_chain_content=cert_chain_content, ) def gen_key(self, keysize: int = 4096, content_format: str = "pem", **kwargs) -> Tuple[str, str]: r""" 生成key文件内容,返回 文件内容,文件路径 :param keysize: 默认值 4096 :param content_format: 默认值 pem :param kwargs: :key dest_dir: 生成的文件存放目录, 不提供时使用临时目录, :key del_dir: 生成文件完成后,是否删除文件夹, 默认值 True :key key_file_name: 生成的key文件名,默认为 x-key """ supported_format = {"pem"} if content_format not in supported_format: raise ValueError(f"不支持的格式,目前支持的格式:{supported_format}") dest_dir = kwargs.get("dest_dir") or None if not dest_dir: dest_dir = tempfile.mkdtemp() del_dir = kwargs.get("del_dir") if del_dir is None: del_dir = True key_file_name = kwargs.get("key_file_name") or "x-key" x_key_filepath = os.sep.join([dest_dir, f"{key_file_name}.pem"]) # 使用命令 openssl genrsa -out root-key.pem ${ROOTCA_KEYSZ} subprocess.run(f"openssl genrsa -out {x_key_filepath} {keysize}", shell=True, check=True, timeout=10) # 读取文件内容获取结果 with open(x_key_filepath, 'r', encoding='utf-8') as f: result_content = f.read() # 尝试删除临时文件夹 if del_dir: shutil.rmtree(dest_dir, ignore_errors=True) return result_content, x_key_filepath def gen_root_cert( self, root_key: str, keysize: int = 4096, content_format: str = "pem", **kwargs ) -> Tuple[str, str]: r""" 生成根证书文件内容,返回 文件内容,文件路径 :param root_key: root_key文件内容或文件路径 :param keysize: 默认值 4096 :param content_format: 默认值 pem :key rootca_org: 默认值 Istio :key rootca_cn: 默认值 Root CA :key rootca_days: 默认值 36500 :key dest_dir: 生成的文件存放目录, 不提供时使用临时目录, :key del_dir: 生成文件完成后,是否删除文件夹, 默认值 True """ supported_format = {"pem"} if content_format not in supported_format: raise ValueError(f"不支持的格式,目前支持的格式:{supported_format}") # 设置默认值 rootca_org = kwargs.get("rootca_org") or "Istio" rootca_cn = kwargs.get("rootca_cn") or "Root CA" rootca_days = kwargs.get("rootca_days") or 36500 dest_dir = kwargs.get("dest_dir") or None if not dest_dir: dest_dir = tempfile.mkdtemp() del_dir = kwargs.get("del_dir") if del_dir is None: del_dir = True if os.path.exists(root_key) and os.path.isfile(root_key): root_key_filepath = root_key else: # 写入 root-key文件 root_key_filepath = os.sep.join([dest_dir, "root-key.pem"]) with open(root_key_filepath, 'w', encoding='utf-8') as f: f.write(root_key) # 生成 root-ca.conf root_ca_conf_filepath = os.sep.join([dest_dir, "root-ca.conf"]) with open(root_ca_conf_filepath, 'w', encoding='utf-8') as f: f.write(f"""[ req ] encrypt_key = no prompt = no utf8 = yes default_md = sha256 default_bits = {keysize} req_extensions = req_ext x509_extensions = req_ext distinguished_name = req_dn [ req_ext ] subjectKeyIdentifier = hash basicConstraints = critical, CA:true keyUsage = critical, digitalSignature, nonRepudiation, keyEncipherment, keyCertSign [ req_dn ] O = {rootca_org} CN = {rootca_cn} """) # 生成 root-cert.csr root_cert_csr_filepath = os.sep.join([dest_dir, "root-cert.csr"]) # 使用命令 openssl req -new -key root-key.pem -config root-ca.conf -out root-cert.csr subprocess.run( f"openssl req -new -key {root_key_filepath} -config {root_ca_conf_filepath} -out {root_cert_csr_filepath}", shell=True, check=True, timeout=10) # 生成公钥 root-cert.pem root_cert_filepath = os.sep.join([dest_dir, "root-cert.pem"]) # 使用命令 openssl x509 -req -days ${ROOTCA_DAYS} -signkey root-key.pem -extensions req_ext -extfile root-ca.conf -in root-cert.csr -out root-cert.pem subprocess.run( f"openssl x509 -req -days {rootca_days} -signkey {root_key_filepath} -extensions req_ext -extfile {root_ca_conf_filepath} -in {root_cert_csr_filepath} -out {root_cert_filepath}", shell=True, check=True, timeout=10) # 读取文件内容获取结果 with open(root_cert_filepath, 'r', encoding='utf-8') as f: result_content = f.read() # 尝试删除临时文件夹 if del_dir: shutil.rmtree(dest_dir, ignore_errors=True) return result_content, root_cert_filepath def gen_intermediate_cert( self, root_key: str, root_cert: str, ca_key: str, keysize: int = 4096, content_format: str = "pem", **kwargs) -> Tuple[str, str]: r""" 生成中间证书文件内容,返回 文件内容,文件路径 :param root_key: root_key文件内容或文件路径 :param root_cert: root_cert文件内容或文件路径 :param ca_key: ca_key文件内容或文件路径 :param keysize: 默认值 4096 :param content_format: 默认值 pem :key dns_names: 默认值 [istiod.istio-system.svc] :key ip_addresses: 默认值 [] :key intermediate_org: 默认值 Istio :key intermediate_cn: 默认值 Intermediate CA :key intermediate_cluster_name: 没有默认值 :key intermediate_days: 默认值 36500 :key dest_dir: 生成的文件存放目录, 不提供时使用临时目录, :key del_dir: 生成文件完成后,是否删除文件夹, 默认值 True """ supported_format = {"pem"} if content_format not in supported_format: raise ValueError(f"不支持的格式,目前支持的格式:{supported_format}") # 设置默认值 dns_names = kwargs.get("dns_names") or ["istiod.istio-system.svc"] ip_addresses = kwargs.get("ip_addresses") or None intermediate_org = kwargs.get("intermediate_org") or "Istio" intermediate_cn = kwargs.get("intermediate_cn") or "Intermediate CA" intermediate_days = kwargs.get("intermediate_days") or 36500 intermediate_cluster_name = kwargs.get("intermediate_cluster_name") or "" if not intermediate_cluster_name: raise ValueError(f"未提供intermediate_cluster_name") dest_dir = kwargs.get("dest_dir") or None if not dest_dir: dest_dir = tempfile.mkdtemp() del_dir = kwargs.get("del_dir") if del_dir is None: del_dir = True # 写入 root-key文件 if os.path.exists(root_key) and os.path.isfile(root_key): root_key_filepath = root_key else: # 写入 root-key文件 root_key_filepath = os.sep.join([dest_dir, "root-key.pem"]) with open(root_key_filepath, 'w', encoding='utf-8') as f: f.write(root_key) if os.path.exists(root_cert) and os.path.isfile(root_cert): root_cert_filepath = root_cert else: # 写入 root-cert文件 root_cert_filepath = os.sep.join([dest_dir, "root-cert.pem"]) with open(root_cert_filepath, 'w', encoding='utf-8') as f: f.write(root_cert) if os.path.exists(ca_key) and os.path.isfile(ca_key): ca_key_filepath = ca_key else: # 写入 ca_key 文件 ca_key_filepath = os.sep.join([dest_dir, "ca-key.pem"]) with open(ca_key_filepath, 'w', encoding='utf-8') as f: f.write(ca_key) # 生成 intermediate.conf intermediate_conf_filepath = os.sep.join([dest_dir, "intermediate.conf"]) # 生成 san 内容 dns_and_ip_list = list() if dns_names: for index, dns_name in enumerate(dns_names): dns_and_ip_list.append(f"DNS.{index} = {dns_name}") if ip_addresses: for index, ip_address in enumerate(ip_addresses): dns_and_ip_list.append(f"IP.{index} = {ip_address}") dns_and_ip_joined_str = "\n".join(dns_and_ip_list) with open(intermediate_conf_filepath, 'w', encoding='utf-8') as f: f.write(f"""[ req ] encrypt_key = no prompt = no utf8 = yes default_md = sha256 default_bits = {keysize} req_extensions = req_ext x509_extensions = req_ext distinguished_name = req_dn [ req_ext ] subjectKeyIdentifier = hash basicConstraints = critical, CA:true, pathlen:0 keyUsage = critical, digitalSignature, nonRepudiation, keyEncipherment, keyCertSign subjectAltName=@san [ san ] {dns_and_ip_joined_str} [ req_dn ] O = {intermediate_org} CN = {intermediate_cn} L = {intermediate_cluster_name} """) # 生成 ca-cert.csr ca_cert_csr_filepath = os.sep.join([dest_dir, "ca-cert.csr"]) # 使用命令 openssl req -new -key ca-key.pem -config intermediate.conf -out ca-cert.csr subprocess.run( f"openssl req -new -key {ca_key_filepath} -config {intermediate_conf_filepath} -out {ca_cert_csr_filepath}", shell=True, check=True, timeout=10) # 生成公钥 ca-cert.pem ca_cert_filepath = os.sep.join([dest_dir, "ca-cert.pem"]) # 使用命令 openssl x509 -req -days ${INTERMEDIATE_DAYS} -CA ../root-cert.pem -CAkey ../root-key.pem -CAcreateserial -extensions req_ext -extfile intermediate.conf -in ca-cert.csr -out ca-cert.pem subprocess.run( f"openssl x509 -req -days {intermediate_days} -CA {root_cert_filepath} -CAkey {root_key_filepath} -CAcreateserial -extensions req_ext -extfile {intermediate_conf_filepath} -in {ca_cert_csr_filepath} -out {ca_cert_filepath}", shell=True, check=True, timeout=10) # 读取文件内容获取结果 with open(ca_cert_filepath, 'r', encoding='utf-8') as f: result_content = f.read() # 尝试删除临时文件夹 if del_dir: shutil.rmtree(dest_dir, ignore_errors=True) return result_content, ca_cert_filepath