#!/usr/bin/env python # -*- encoding: utf-8 -*- """ @Version: 1.0 @Python Version:3.6.6 @Author: ludq1 @Email: ludq1@chinaunicom.cn @date: 2023/04/07 11:40:00 @Description: """ import logging import smtplib from email.header import Header from email.message import Message from email.mime.text import MIMEText from typing import Union import requests from ..globalutility import Utility from ..my_stringutils import MyStringUtils class ToReceiveNoticeUser: r""" 接收通知人员POJO类,不依赖任何其他应用基础类 """ phone: str = None email: str = None def __init__(self, source: dict = None, phone: str = None, email: str = None): r""" 根据 source(包含 phone 和 email 初始化 ToReceiveNoticeUser :param source: :param phone: :param email: """ if source is None: source = dict() self.phone = phone or source.get("phone") self.email = email or source.get("email") def __str__(self): return Utility.dict2jsonstr(self.__dict__) class NoticeSendResult: r""" 发送通知结果POJO类,不依赖任何其他应用基础类 """ email_send_result: bool = None sms_send_result: bool = None def __init__(self): r""" 初始化 """ self.email_send_result = False self.sms_send_result = False def is_all_success(self): r""" 是否全部发送成功便捷方法 """ return self.email_send_result and self.sms_send_result def is_any_success(self): r""" 是否任意发送成功便捷方法 """ return self.email_send_result or self.sms_send_result class SendEmailConfig: r""" 发送邮件配置类,不依赖任何其他应用基础类 """ def __init__(self, source: dict = None, mail_host: str = None, mail_host_port: str = None, mail_from_account: str = None, mail_from_account_dwp: str = None): r""" 根据 source(包含 mail_host 和 mail_from_account 和 mail_from_account_dwp key的dict) 初始化 SendEmailConfig :param source: :param mail_host: :param mail_host_port: :param mail_from_account: :param mail_from_account_dwp: """ if source is None: source = dict() self.mail_host: str = mail_host or source.get("mail_host") self.mail_host_port: str = mail_host_port or source.get("mail_host_port") or 25 self.mail_from_account: str = mail_from_account or source.get("mail_from_account") self.mail_from_account_dwp: str = mail_from_account_dwp or source.get("mail_from_account_dwp") def __str__(self): return Utility.dict2jsonstr(self.__dict__) class SendSmsConfig: r""" 发送短信配置类,不依赖任何其他应用基础类 """ def __init__(self, source: dict = None, api_server: str = None, sys_name: str = None, sys_token: str = None): r""" 根据 source(包含 api_server 和 sys_name 和 sys_token key的dict) 初始化 SendSmsConfig :param source: :param api_server: :param sys_name: :param sys_token: """ if source is None: source = dict() self.api_server: str = api_server or source.get("api_server") self.sys_name: str = sys_name or source.get("sys_name") self.sys_token: str = sys_token or source.get("sys_token") def __str__(self): return Utility.dict2jsonstr(self.__dict__) class SendNoticeUtils: r""" 发送通知 Utils 类,不依赖任何其他应用基础类 """ send_email_config: SendEmailConfig = None send_sms_config: SendSmsConfig = None def __init__(self, send_email_config: SendEmailConfig = None, send_sms_config: SendSmsConfig = None): r""" 初始化 :param send_sms_config: :param send_email_config: """ self.send_email_config = send_email_config self.send_sms_config = send_sms_config def gen_to_receive_notice_user_list(self, user_list: list) -> list: r""" 根据 source(包含 phone 和 email key的dict) 生成 ToReceiveNoticeUser 列表 :param user_list: """ if not user_list: return list() result_list = list() for source in user_list: if isinstance(source, ToReceiveNoticeUser): tmp_source = source else: tmp_source = ToReceiveNoticeUser(source) result_list.append(tmp_source) return result_list def do_notice(self, to_receive_notice_user_list: list, subject: str = None, notice_content: str = None, add_subject_to_notice_content: bool = False) -> NoticeSendResult: r""" 通知(短信通知,邮件通知等) :param to_receive_notice_user_list: :param subject: :param notice_content: :param add_subject_to_notice_content: """ to_receive_notice_user_list = self.gen_to_receive_notice_user_list(to_receive_notice_user_list) email_list = [tmp_to_receive_notice_user.email for tmp_to_receive_notice_user in to_receive_notice_user_list] phone_list = [tmp_to_receive_notice_user.phone for tmp_to_receive_notice_user in to_receive_notice_user_list] real_notice_content = notice_content if add_subject_to_notice_content: real_notice_content = Utility.join_str(subject, '\n', notice_content) send_success_for_send_email: bool = self.do_send_email(email_list, subject, real_notice_content) send_success_for_send_sms: bool = self.do_send_sms(phone_list, real_notice_content) result = NoticeSendResult() result.sms_send_result = send_success_for_send_sms result.email_send_result = send_success_for_send_email return result def do_notice_with_any_success(self, to_receive_notice_user_list: list, subject: str = None, notice_content: str = None, add_subject_to_notice_content: bool = False) -> bool: r""" 通知(短信通知,邮件通知等) :param to_receive_notice_user_list: :param subject: :param notice_content: :param add_subject_to_notice_content: """ return self.do_notice(to_receive_notice_user_list, subject, notice_content, add_subject_to_notice_content).is_any_success() def do_send_email(self, to_mails: list, subject: str = None, notice_content: Union[str, Message] = None) -> bool: r""" 发送邮件 :param to_mails: :param subject: :param notice_content: """ smtp = None try: if self.send_email_config is None: raise RuntimeError('send_email_config为空') mail_host = self.send_email_config.mail_host mail_host_port = self.send_email_config.mail_host_port mail_from_account = self.send_email_config.mail_from_account mail_from_account_dwp = self.send_email_config.mail_from_account_dwp if not mail_host: raise RuntimeError('send_email_config.mail_host 为空') if not mail_host_port: raise RuntimeError('send_email_config.mail_host_port 为空') if not mail_from_account: raise RuntimeError('send_email_config.mail_from_account 为空') if not mail_from_account_dwp: raise RuntimeError('send_email_config.mail_from_account_dwp 为空') smtp = smtplib.SMTP(host=mail_host, port=mail_host_port) smtp.login(mail_from_account, mail_from_account_dwp) real_notice_content = notice_content if not isinstance(notice_content, Message): real_notice_content = MIMEText(MyStringUtils.to_str(notice_content), "plain", "utf-8") real_notice_content['Subject'] = Header(subject, 'utf-8') smtp.sendmail(mail_from_account, to_mails, real_notice_content.as_string()) return True except BaseException as e: logging.error("发送邮件时发生异常") logging.exception(e, exc_info=True) return False finally: if smtp is not None: smtp.quit() def do_send_sms(self, phone_list: list, notice_content: str = None) -> bool: r""" 发送邮件 :param phone_list: :param notice_content: """ try: if self.send_sms_config is None: raise RuntimeError('send_sms_config为空') api_server = self.send_sms_config.api_server sys_name = self.send_sms_config.sys_name sys_token = self.send_sms_config.sys_token if not api_server: raise RuntimeError('send_sms_config.api_server 为空') if not sys_name: raise RuntimeError('send_sms_config.sys_name 为空') if not sys_token: raise RuntimeError('send_sms_config.sys_token 为空') schema_tuple = ("http://", "https://") default_schema = 'http://' if not api_server.lower().startswith(schema_tuple): api_server = Utility.join_str(default_schema, api_server) map_header = dict() map_header['Content-Type'] = 'application/json' request_dict = dict() request_dict['sysName'] = sys_name request_dict['token'] = sys_token request_dict['content'] = notice_content request_dict['phoneNoList'] = phone_list resp = requests.post(api_server, data=Utility.dict2jsonstr(request_dict), headers=map_header, verify=False) # 判断状态码 if resp.status_code != 200: raise RuntimeError(Utility.join_str(f"调用sms ApiServer接口返回状态码为{resp.status_code}")) # 判断RetCode resp_dict = Utility.jsonstr2dict(resp.text) if resp_dict is None: raise RuntimeError(Utility.join_str("调用sms ApiServer接口返回内容不是jsonObject,返回内容为: ", resp.text)) ret_code = resp_dict.get('RetCode') if str(ret_code) != '1': raise RuntimeError(Utility.join_str("调用sms ApiServer接口返回RetCode不是1, 返回内容为: ", resp.text)) return True except BaseException as e: logging.error("发送短信时发生异常") logging.exception(e, exc_info=True) return False