import os import typing import smtplib from getpass import getpass from email.message import EmailMessage import mimetypes import email # from email.mime.base import MIMEBase # from email.mime.image import MIMEImage # from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication class EmailHandler(): """ Creates an EmailHandler, which is capable of connecting to a mail server at a respective port, as well as logging into a specific user's mail address. Upon creating messages, these can be sent via this handler. Options: mail_server: address of the server, such as 'smtp.gmail.com' or 'w01d5503.kasserver.com mail_port: 25 - SMTP Port, to send emails 110 - POP3 Port, to receive emails 143 - IMAP Port, to receive from IMAP 465 - SSL Port of SMTP 587 - alternative SMTP Port 993 - SSL/TLS-Port of IMAP 995 - SSL/TLS-Port of POP3 mail_address: a specific user's Email address, which will be used to send Emails. Example: "my_user@gmail.com" """ def __init__(self, mail_server:str, mail_port:int, mail_address:str): self.mail_server = mail_server self.mail_port = mail_port self.mail_address = mail_address self.server = smtplib.SMTP_SSL(self.mail_server, self.mail_port) # alternatively, SMTP def check_state(self): """check, whether the server login took place and is open.""" try: (status_code, status_msg) = self.server.noop() return status_code==250 # 250: b'2.0.0 Ok' except smtplib.SMTPServerDisconnected: return False def check_connection(self): """check, whether the server object is connected to the server. If not, connect it. """ try: self.server.ehlo() except smtplib.SMTPServerDisconnected: self.server.connect(self.mail_server, self.mail_port) return def check_login(self)->bool: """check, whether the server object is logged in as a user""" user = self.server.__dict__.get("user",None) return user is not None def login(self, interactive:bool=True): """ login on the determined mail server's mail address. By default, this function opens an interactive window to type the password without echoing (printing '*******' instead of readable characters). returns (status_code, status_msg) """ self.check_connection() if interactive: (status_code, status_msg) = self.server.login(self.mail_address, password=getpass()) else: # fernet + password file raise NotImplementedError() return (status_code, status_msg) # should be: (235, b'2.7.0 Authentication successful') def create_email(self, subject:str, message_body:str)->EmailMessage: """ Create an EmailMessage object, which contains the Email's header ("Subject"), content ("Message Body") and the sender's address ("From"). The EmailMessage object does not contain the recipients yet, as these will be defined upon sending the Email. """ msg = EmailMessage() msg["Subject"] = subject msg["From"] = self.mail_address #msg["To"] = email_tgts # will be defined in self.send_email msg.set_content(message_body) return msg def build_recipients(self, email_tgts:list[str]): """ email formatting does not support lists. Instead, items are joined into a comma-space-separated string. Example: [mail1@mail.com, mail2@mail.com] becomes 'mail1@mail.com, mail2@mail.com' """ return ', '.join(email_tgts) def open_mime_application(self, path:str)->MIMEApplication: """open a local file, read the bytes into a MIMEApplication object, which is built with the proper subtype (based on the file extension)""" with open(path, 'rb') as file: attachment = MIMEApplication(file.read(), _subtype=mimetypes.MimeTypes().guess_type(path)) attachment.add_header('Content-Disposition','attachment',filename=str(os.path.basename(path))) return attachment def attach_file(self, path:str, msg:email.mime.multipart.MIMEMultipart)->None: """ attach a file to the message. This function opens the file, reads its bytes, defines the mime type by the path extension. The filename is appended as the header. mimetypes: # https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types """ attachment = self.open_mime_application(path) msg.attach(attachment) return def send_email(self, msg:EmailMessage, email_tgts:list[str], cc_tgts:typing.Optional[list[str]]=None, bcc_tgts:typing.Optional[list[str]]=None, debug:bool=False)->typing.Union[dict,EmailMessage]: """ send a prepared email message to recipients (email_tgts), copy (cc_tgts) and blind copy (bcc_tgts). Returns a dictionary of feedback, which is commonly empty and the EmailMessage. When failing, this function returns an SMTP error instead of returning the default outputs. """ # Set the Recipients msg["To"] = self.build_recipients(email_tgts) # optionally, add CC and BCC (copy and blind-copy) if cc_tgts is not None: msg["Cc"] = self.build_recipients(cc_tgts) if bcc_tgts is not None: msg["Bcc"] = self.build_recipients(bcc_tgts) # when debugging, do not send the Email, but return the EmailMessage. if debug: return {}, msg assert self.check_login(), f"currently not logged in. Cannot send an Email. Make sure to properly use self.login first. " # send the prepared EmailMessage via the server. feedback = self.server.send_message(msg) return feedback, msg def translate_mail_to_multipart(self, msg:EmailMessage): """EmailMessage does not support HTML and attachments. Hence, one can convert an EmailMessage object.""" if msg.is_multipart(): return msg # create a novel MIMEMultipart email msg_new = MIMEMultipart("mixed") headers = list((k, v) for (k, v) in msg.items() if k not in ("Content-Type", "Content-Transfer-Encoding")) # add the headers of msg to the new message for k,v in headers: msg_new[k] = v # delete the headers from msg for k,v in headers: del msg[k] # attach the remainder of the msg, such as the body, to the MIMEMultipart msg_new.attach(msg) return msg_new def print_email_attachments(self, msg:MIMEMultipart)->list[str]: """return a list of lines of an Email, which contain 'filename=' as a list. """ return [line_ for line_ in msg.as_string().split("\n") if "filename=" in line_] def close(self): self.server.__dict__.pop("user",None) self.server.__dict__.pop("password",None) # quit the server connection (internally uses .close) self.server.quit() return