implementing an EmailHandler to send out emails (potentially with attachment).

This commit is contained in:
scopesorting 2023-12-15 17:37:27 +01:00 committed by Max Metz
parent 06bad205de
commit f3818a1b2f

View File

@ -0,0 +1,174 @@
import os
import typing
import smtplib
from getpass import getpass
from email.message import EmailMessage
import mimetypes
import email
# from email.mime.base import MIMEBase
# from email.mime.image import MIMEImage
# from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
class EmailHandler():
"""
Creates an EmailHandler, which is capable of connecting to a mail server at a respective port,
as well as logging into a specific user's mail address.
Upon creating messages, these can be sent via this handler.
Options:
mail_server: address of the server, such as 'smtp.gmail.com' or 'w01d5503.kasserver.com
mail_port:
25 - SMTP Port, to send emails
110 - POP3 Port, to receive emails
143 - IMAP Port, to receive from IMAP
465 - SSL Port of SMTP
587 - alternative SMTP Port
993 - SSL/TLS-Port of IMAP
995 - SSL/TLS-Port of POP3
mail_address: a specific user's Email address, which will be used to send Emails. Example: "my_user@gmail.com"
"""
def __init__(self, mail_server:str, mail_port:int, mail_address:str):
self.mail_server = mail_server
self.mail_port = mail_port
self.mail_address = mail_address
self.server = smtplib.SMTP_SSL(self.mail_server, self.mail_port) # alternatively, SMTP
def check_state(self):
"""check, whether the server login took place and is open."""
try:
(status_code, status_msg) = self.server.noop()
return status_code==250 # 250: b'2.0.0 Ok'
except smtplib.SMTPServerDisconnected:
return False
def check_connection(self):
"""check, whether the server object is connected to the server. If not, connect it. """
try:
self.server.ehlo()
except smtplib.SMTPServerDisconnected:
self.server.connect(self.mail_server, self.mail_port)
return
def check_login(self)->bool:
"""check, whether the server object is logged in as a user"""
user = self.server.__dict__.get("user",None)
return user is not None
def login(self, interactive:bool=True):
"""
login on the determined mail server's mail address. By default, this function opens an interactive window to
type the password without echoing (printing '*******' instead of readable characters).
returns (status_code, status_msg)
"""
self.check_connection()
if interactive:
(status_code, status_msg) = self.server.login(self.mail_address, password=getpass())
else:
# fernet + password file
raise NotImplementedError()
return (status_code, status_msg) # should be: (235, b'2.7.0 Authentication successful')
def create_email(self, subject:str, message_body:str)->EmailMessage:
"""
Create an EmailMessage object, which contains the Email's header ("Subject"), content ("Message Body") and the sender's address ("From").
The EmailMessage object does not contain the recipients yet, as these will be defined upon sending the Email.
"""
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = self.mail_address
#msg["To"] = email_tgts # will be defined in self.send_email
msg.set_content(message_body)
return msg
def build_recipients(self, email_tgts:list[str]):
"""
email formatting does not support lists. Instead, items are joined into a comma-space-separated string.
Example:
[mail1@mail.com, mail2@mail.com] becomes
'mail1@mail.com, mail2@mail.com'
"""
return ', '.join(email_tgts)
def open_mime_application(self, path:str)->MIMEApplication:
"""open a local file, read the bytes into a MIMEApplication object, which is built with the proper subtype (based on the file extension)"""
with open(path, 'rb') as file:
attachment = MIMEApplication(file.read(), _subtype=mimetypes.MimeTypes().guess_type(path))
attachment.add_header('Content-Disposition','attachment',filename=str(os.path.basename(path)))
return attachment
def attach_file(self, path:str, msg:email.mime.multipart.MIMEMultipart)->None:
"""
attach a file to the message. This function opens the file, reads its bytes, defines the mime type by the
path extension. The filename is appended as the header.
mimetypes: # https://developer.mozilla.org/en-US/docs/Web/HTTP/Basics_of_HTTP/MIME_types/Common_types
"""
attachment = self.open_mime_application(path)
msg.attach(attachment)
return
def send_email(self, msg:EmailMessage, email_tgts:list[str], cc_tgts:typing.Optional[list[str]]=None, bcc_tgts:typing.Optional[list[str]]=None, debug:bool=False)->typing.Union[dict,EmailMessage]:
"""
send a prepared email message to recipients (email_tgts), copy (cc_tgts) and blind copy (bcc_tgts).
Returns a dictionary of feedback, which is commonly empty and the EmailMessage.
When failing, this function returns an SMTP error instead of returning the default outputs.
"""
# Set the Recipients
msg["To"] = self.build_recipients(email_tgts)
# optionally, add CC and BCC (copy and blind-copy)
if cc_tgts is not None:
msg["Cc"] = self.build_recipients(cc_tgts)
if bcc_tgts is not None:
msg["Bcc"] = self.build_recipients(bcc_tgts)
# when debugging, do not send the Email, but return the EmailMessage.
if debug:
return {}, msg
assert self.check_login(), f"currently not logged in. Cannot send an Email. Make sure to properly use self.login first. "
# send the prepared EmailMessage via the server.
feedback = self.server.send_message(msg)
return feedback, msg
def translate_mail_to_multipart(self, msg:EmailMessage):
"""EmailMessage does not support HTML and attachments. Hence, one can convert an EmailMessage object."""
if msg.is_multipart():
return msg
# create a novel MIMEMultipart email
msg_new = MIMEMultipart("mixed")
headers = list((k, v) for (k, v) in msg.items() if k not in ("Content-Type", "Content-Transfer-Encoding"))
# add the headers of msg to the new message
for k,v in headers:
msg_new[k] = v
# delete the headers from msg
for k,v in headers:
del msg[k]
# attach the remainder of the msg, such as the body, to the MIMEMultipart
msg_new.attach(msg)
return msg_new
def print_email_attachments(self, msg:MIMEMultipart)->list[str]:
"""return a list of lines of an Email, which contain 'filename=' as a list. """
return [line_ for line_ in msg.as_string().split("\n") if "filename=" in line_]
def close(self):
self.server.__dict__.pop("user",None)
self.server.__dict__.pop("password",None)
# quit the server connection (internally uses .close)
self.server.quit()
return