Created an HTML-based Email notification template, which is adaptive to error messages. It includes a logo file from the local repository (logo_bremen_calling.png). Refactoring methods and clearing old methods. Created a one-line function to connect to the Email server. Included the Notifier's sending method in the routine will be executed every 15 minutes and includes those shipcalls, which have been evaluated at least 10 minutes ago. Located the logo file and email template in a resources/ folder within the library. Manually overwriting the Emailaddress of all notification recipients, so the BSMD mail is used throughout testing.

This commit is contained in:
Max Metz 2024-07-31 19:01:17 +02:00
parent 8060b0ee30
commit 40e7a4b3af
15 changed files with 704 additions and 112 deletions

View File

@ -33,7 +33,8 @@ def get_synchronous_shipcall_times_standalone(query_time:pd.Timestamp, all_df_ti
returns: counts returns: counts
""" """
assert isinstance(query_time,pd.Timestamp) assert isinstance(query_time,pd.Timestamp) or pd.isnull(query_time), f"expected query_time to be a pd.Timestamp or pd.NaT. Found: {type(query_time)}"
assert isinstance(all_df_times,pd.DataFrame)
# get a timedelta for each valid (not Null) time entry # get a timedelta for each valid (not Null) time entry
time_deltas_eta = [(query_time.to_pydatetime()-time_.to_pydatetime()) for time_ in all_df_times.loc[:,"eta_berth"] if not pd.isnull(time_)] time_deltas_eta = [(query_time.to_pydatetime()-time_.to_pydatetime()) for time_ in all_df_times.loc[:,"eta_berth"] if not pd.isnull(time_)]
@ -439,4 +440,5 @@ class SQLHandler():
def count_synchronous_shipcall_times(self, query_time:pd.Timestamp, all_df_times:pd.DataFrame, delta_threshold=900)->int: def count_synchronous_shipcall_times(self, query_time:pd.Timestamp, all_df_times:pd.DataFrame, delta_threshold=900)->int:
"""count all times entries, which are too close to the query_time. The {delta_threshold} determines the threshold. returns counts (int)""" """count all times entries, which are too close to the query_time. The {delta_threshold} determines the threshold. returns counts (int)"""
assert isinstance(all_df_times, pd.DataFrame)
return get_synchronous_shipcall_times_standalone(query_time, all_df_times, delta_threshold) return get_synchronous_shipcall_times_standalone(query_time, all_df_times, delta_threshold)

View File

@ -222,7 +222,7 @@ class SQLQuery():
"api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +\ "api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +\
"WHERE user_name = ?username? OR user_email = ?username?" "WHERE user_name = ?username? OR user_email = ?username?"
return query return query
@staticmethod @staticmethod
def get_notifications()->str: def get_notifications()->str:
query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \ query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \
@ -266,6 +266,11 @@ class SQLQuery():
query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name" query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name"
return query return query
@staticmethod
def get_ship_by_id()->str:
query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship WHERE id = ?id?"
return query
@staticmethod @staticmethod
def get_times()->str: def get_times()->str:
query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \ query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \
@ -321,6 +326,22 @@ class SQLQuery():
query = prefix+stage1+bridge+stage2+suffix query = prefix+stage1+bridge+stage2+suffix
return query return query
@staticmethod
def get_notifications_post(schemaModel:dict)->str:
param_keys = {key:key for key in schemaModel.keys()}
prefix = "INSERT INTO notification ("
bridge = ") VALUES ("
suffix = ")"
non_dynamic_keys = ["id", "created", "modified"]
stage1 = ",".join([key for key in schemaModel.keys() if not key in non_dynamic_keys])
stage2 = ",".join([f"?{param_keys.get(key)}?" for key in schemaModel.keys() if not key in non_dynamic_keys])
query = prefix+stage1+bridge+stage2+suffix
return query
@staticmethod @staticmethod
def get_last_insert_id()->str: def get_last_insert_id()->str:

View File

@ -0,0 +1,7 @@
"""This file contains login information to register into distinct notification accounts."""
mail_server = 'w01d5503.kasserver.com'
mail_port=465
mail_address="max.metz@scope-sorting.com"
mail_pwd = b'gAAAAABmqJlkXbtJTL1tFiyQNHhF_Y7sgtVI0xEx07ybwbX70Ro1Vp73CLDq49eFSYG-1SswIDQ2JBSORYlWaR-Vh2kIwPHy_lX8SxkySrRvBRzkyZP5x0I='

View File

@ -1,7 +1,19 @@
import typing import typing
import datetime
from BreCal.database.sql_handler import execute_sql_query_standalone from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.database.sql_queries import SQLQuery from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model from BreCal.schemas import model
from BreCal.brecal_utils.time_handling import difference_to_then
from BreCal.notifications.accounts import mail_server, mail_port, mail_address, mail_pwd
from BreCal.services.email_handling import EmailHandler, create_shipcall_evaluation_notification, send_notification, get_default_html_email
from BreCal.database.enums import ParticipantwiseTimeDelta
eta_etd_type_dict = {
model.ShipcallType.arrival : "Ankunft",
model.ShipcallType.departure : "Abfahrt",
model.ShipcallType.shifting : "Wechselnd"
}
class Notifier(): class Notifier():
""" """
@ -35,112 +47,82 @@ class Notifier():
- iterate over each remaining shipcall and apply .send_notification - iterate over each remaining shipcall and apply .send_notification
- those which are unsent, shall be sent by the respective type - those which are unsent, shall be sent by the respective type
""" """
raise NotImplementedError("skeleton") # set a threshold, when alarm event notifications become eligible
time_diff_threshold = float(ParticipantwiseTimeDelta.NOTIFICATION)*60 # m minutes, converted to seconds
debug = is_test # if is_test, the Emails will not be issued. Only a print message will be created.
update_database = True if not is_test else False # if_test, the database will not be updated.
time_diff_threshold = time_diff_threshold if not is_test else 0.0 # 0.0 delay when is_test is set.
# get all shipcalls email_handler = EmailHandler(mail_server=mail_server, mail_port=mail_port, mail_address=mail_address)
all_shipcalls = NotImplementedError
shipcalls = [shipcall for shipcall in all_shipcalls if not shipcall.evaluation_notifications_sent] # get candidates: find all eligible shipcalls, where the evaluation state is yellow or red & the notifications are not yet sent
for shipcall in shipcalls: eligible_shipcalls = Notifier.get_eligible_shipcalls(time_diff_threshold)
notification_list = Notifier.send_notification(shipcall, is_test=is_test)
# #TODO: get all notifications # get a list of tuples, where (shipcall, users) are combined for easier iterations.
# #TODO: get matching shipcall (based on shipcall_id) notification_instructions = Notifier.build_notification_tuples(eligible_shipcalls)
# #TODO: filter: consider only those, which are not yet sent if len(notification_instructions)>0: # only perform a login when there are any notification_instructions
try:
# login in advance, so the email handler uses a shared connection. It disconnects only once at the end of the call.
email_handler.login(interactive=False, pwd=mail_pwd)
for shipcall, users in notification_instructions:
assert isinstance(shipcall, model.Shipcall)
assert isinstance(users,list)
assert all([isinstance(user,model.User) for user in users])
# identify necessity # iterate over each notification type
# #TODO: get the 'evaluation_notifications_sent' field from all shipcalls (based on shipcall_id) for notification_type in Notifier.get_all_notification_types():
# if not -> return # consider only those users, which have subscribed to the respective notification type
# USE shipcall.evaluation_notifications_sent eligible_users = Notifier.get_eligible_users(users, notification_type)
# #TODO: those which are unsent, shall be created&sent by the respective type -- Note: consider the is_test argument if len(eligible_users)>0:
# iterate over the list of Notifier.build_notification_type_list Notifier.create_and_send_notification_mapper(notification_type, shipcall, eligible_users, email_handler, mail_pwd, debug=debug)
# one might use Notifier.create(..., update_database=True)
# use the History (GetHistory -- by shipcall_id) to identify all subscribed users
# #TODO: update the shipcall dataset ('evaluation_notifications_sent') -- Note: consider the is_test argument if not debug:
# populate notifications within the database to keep track
Notifier.generate_notification(shipcall, notification_type)
# #TODO_clarify: how to handle the 'evaluation_notifications_sent', when there is no recipient? # update the database entries, so notifications are only sent once.
if update_database:
Notifier.shipcall_put_update_evaluation_notifications_sent_flag(shipcall)
# #TODO: except... create log?
finally:
email_handler.close()
return return
@staticmethod
def build_notification_tuples(eligible_shipcalls)->list[typing.Tuple[model.Shipcall,list[model.User]]]:
"""
creates tuples, where shipcall and the list of attached users are grouped. One can iterate over the tuples
to perform actions, such as issueing notifications
"""
notification_instructions = []
for shipcall in eligible_shipcalls:
# get all users, which are attached to the shipcall (uses the History dataset)
users = Notifier.get_users_via_history(shipcall.id)
# create and store tuples of (shipcall, users) for each eligible shipcall
notification_instructions.append((shipcall, users))
return notification_instructions
@staticmethod @staticmethod
def send_notification(shipcall:model.Shipcall, is_test:bool=False)->list[model.Notification]: def get_eligible_users(users, notification_type):
""" eligible_users = [user for user in users if Notifier.check_user_is_subscribed_to_notification_type(user,notification_type=notification_type)]
Complex-function, which is responsible of creating notification messages, issuing them to users and optionally updating
the database. The requirement is, that the notification is required and passes through an internal set of filters.
Steps: # filter: consider only those users, where an Email is set
- get all notifications of shipcall_id # #TODO: this is Email-specific and should not be a filter for other notifications
- identify the assigned list of users eligible_users = [user for user in eligible_users if user.user_email is not None]
- apply all filters. When a filter triggers, exit. If not, create and send a notification. return eligible_users
"""
update_database = False if is_test else True
# #TODO: the concept of old state and new state must be refactored.
# old state: read shipcall_id from notifications and look for the latest finding (if None -> EvaluationType.undefined)
# new state: read shipcall_id from shipcalls and look for the *current* 'evaluation' (-> EvaluationType(value))
# get existing notifications by shipcall_id (list)
existing_notifications = Notifier.get_existing_notifications(shipcall_id=shipcall.id)
old_state = NotImplementedError
new_state = shipcall.evaluation
# get User by querying all History objects of a shipcall_id
users = Notifier.get_users_via_history(shipcall_id=shipcall.id)
# identify necessity
# state-check: Did the 'evaluation' shift to a higher level of severity?
severity_bool = Notifier.check_higher_severity(old_state, new_state)
if not severity_bool:
return None
# #TODO: time-based filter. There shall be 'enough' time between the evaluation time and NOW
evaluation_time = shipcall.evaluation_time
# latency_bool = #TODO_DIFFERENCE_FROM_NOW_TO_EVALUATION_TIME____THIS_METHOD_ALREADY_EXISTS(evaluation_time)
# careful: what is True, what is False?
# if latency_booL:
# return None
notification_list = []
for user in users:
notification = Notifier.create(
shipcall.id,
old_state,
new_state,
user,
update_database=update_database,
is_test=is_test
)
notification_list.append(notification)
return notification_list
@staticmethod
def publish(shipcall_id, old_state, new_state, user, update_database:bool=False)->typing.Optional[model.Notification]:
"""
Complex-function, which creates, sends and documents a notification. It serves as a convenience function.
The method does not apply internal filters to identify, whether a notification should be created in the first place.
options:
update_database: bool.
# #TODO: instead of update_database, one may also use is_test
"""
# 1.) create
# ... = Notifier.create(shipcall_id, old_state, new_state, user) # e.g., might return a dictionary of dict[model.NotificationType, str], where str is the message
# 2.) send
# ... = Notifier.send(...) # should contain internal 'logistics', which user the respective handlers to send notifications
# 3.) document (mysql database)
# if update_database
# ... = Notifier.document(...)
raise NotImplementedError("skeleton")
return
@staticmethod @staticmethod
def create(shipcall_id, old_state, new_state, user, update_database:bool=False)->typing.Optional[model.Notification]: def create(shipcall_id, old_state, new_state, user, update_database:bool=False)->typing.Optional[model.Notification]:
""" """
# #TODO_refactor: drastically change this method. It should only generate notifications, but not send them.
Standalone function, which creates a Notification for a specific user. Standalone function, which creates a Notification for a specific user.
Steps: Steps:
@ -173,6 +155,7 @@ class Notifier():
# get a list of all subscribed notification types and track the state (success or failure) # get a list of all subscribed notification types and track the state (success or failure)
raise NotImplementedError("skeleton")
successes = {} successes = {}
notification_type_list = Notifier.build_notification_type_list(user) notification_type_list = Notifier.build_notification_type_list(user)
for notification_type in notification_type_list: for notification_type in notification_type_list:
@ -183,7 +166,6 @@ class Notifier():
success_state = Notifier.send_notification_by_type(notification_type, message) success_state = Notifier.send_notification_by_type(notification_type, message)
successes[notification_type] = success_state successes[notification_type] = success_state
raise NotImplementedError("skeleton")
notification = ... notification = ...
return notification return notification
@ -197,11 +179,10 @@ class Notifier():
def get_users_via_history(shipcall_id:int)->list[model.User]: def get_users_via_history(shipcall_id:int)->list[model.User]:
"""using the History objects, one can infer the user_id, which allows querying the Users""" """using the History objects, one can infer the user_id, which allows querying the Users"""
histories = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : shipcall_id}, model=model.History, command_type="query") histories = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : shipcall_id}, model=model.History, command_type="query")
user_ids = [ assert isinstance(histories,list)
history.user_id assert all([isinstance(history,model.History) for history in histories])
for history in histories
] users = [Notifier.get_user(history.user_id) for history in histories]
users = [Notifier.get_user(user_id) for user_id in user_ids]
return users return users
@staticmethod @staticmethod
@ -306,8 +287,249 @@ class Notifier():
else: else:
raise ValueError(notification_type) raise ValueError(notification_type)
return return
@staticmethod
def get_eligible_shipcalls(time_diff_threshold:float):
"""
get all eligible shipcalls, which do not have a sent notification yet
criterion a) notification shall not be sent yet (evaluation_notifications_sent = 0)
criterion b) evaluation state is yellow or red (type 2 or 3)
"""
query = 'SELECT * FROM shipcall WHERE (evaluation_notifications_sent = ?evaluation_notifications_sent?) AND (evaluation = 2 OR evaluation = 3)'
evaluation_notifications_sent = 0
# filter out the shipcalls, where the notifications were already sent (ignores null values, which is as expected)
eligible_shipcalls = execute_sql_query_standalone(query=query, model=model.Shipcall, param={"evaluation_notifications_sent" : evaluation_notifications_sent})
# filter out the shipcalls, where evaluation_time is not set
eligible_shipcalls = [shipcall for shipcall in eligible_shipcalls if shipcall.evaluation_time is not None]
# filter out the shipcalls, where the evaluation_time is too recent. We expect a minimum difference, so user input errors do not cause notifications
eligible_shipcalls = [shipcall for shipcall in eligible_shipcalls if Notifier.check_shipcall_evaluation_time_exceeds_minimum_time_difference(shipcall, time_diff_threshold)]
return eligible_shipcalls
@staticmethod
def get_eligible_notifications_of_shipcall(shipcall:model.Shipcall, time_diff_threshold:float)->list[model.Notification]:
"""obtain all notifications, which belong to the shipcall id"""
query = SQLQuery.get_notifications()
eligible_notifications = execute_sql_query_standalone(query=query, model=model.Notification, param={"scid" : shipcall.id})
eligible_notifications = [notification for notification in eligible_notifications if Notifier.check_notification_level_matches_shipcall_entry(notification, shipcall)]
return eligible_notifications
@staticmethod
def check_shipcall_evaluation_time_exceeds_minimum_time_difference(shipcall:model.Shipcall, time_diff_threshold:float):
"""
a notification may only be sent, when the created notification has been created or modified {time_diff_threshold} seconds ago.
"""
assert (shipcall.evaluation_time is not None), f"must provide 'evaluation_time'"
return difference_to_then(shipcall.evaluation_time)>=time_diff_threshold
@staticmethod
def check_notification_level_matches_shipcall_entry(notification, shipcall):
"""
a notification may only be sent, when the shipcall entry matches the notification level.
otherwise, a user may have adapted the shipcall in the mean-time, so a notification would no longer be useful.
"""
return int(shipcall.evaluation) == int(notification.level)
@staticmethod
def get_eligible_notifications(shipcalls:list[model.Shipcall]):
"""obtain a list of all notifications of each element of the shipcall list."""
eligible_notifications = []
for shipcall in shipcalls:
eligible_notification = Notifier.get_eligible_notifications_of_shipcall(shipcall)
eligible_notifications.extend(eligible_notification)
raise NotImplementedError("refactoring!")
return eligible_notifications
@staticmethod
def create_notifications_for_user_list(shipcall, users:list[model.User]):
raise NotImplementedError("deprecated")
notification_type_list = []
for user in users:
user_notification_type_list = Notifier.build_notification_type_list(user)
notification_type_list.extend(user_notification_type_list)
# get the unique notification types
notification_type_list = list(set(notification_type_list))
for notification_type in notification_type_list:
schemaModel = dict(shipcall_id = shipcall.id, level = int(shipcall.evaluation), type = notification_type, message = "", created = datetime.datetime.now(), modified=None)
query = SQLQuery.get_notifications_post(schemaModel)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
return
@staticmethod
def generate_notification(shipcall:model.Shipcall, notification_type:model.NotificationType):
"""
This one-line method creates a notification for the provided shipcall and notification type
"""
schemaModel = dict(
shipcall_id = shipcall.id,
level = int(shipcall.evaluation),
type = notification_type,
message = "", # #TODO_messsage: what should be stored here? The HTML-template appears to be too long.
created = datetime.datetime.now(),
modified=None)
query = SQLQuery.get_notifications_post(schemaModel)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
return
@staticmethod
def DEPRECATED_generate_notifications(shipcall_id):
"""
# #TODO_delete: this method can be safely removed after 15.08.2024
This one-line method creates all notifications for the provided shipcall id. It does so by obtaining the shipcall,
looking up its history, and finding all attached users.
For each user, a notification will be created for each subscribed notification type (e.g., Email)
"""
shipcall = Notifier.get_shipcall(shipcall_id)
notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : shipcall_id}, model=model.Notification, command_type="query")
latest_notification = Notifier.find_latest_notification(notifications)
old_state = model.EvaluationType(latest_notification.level) if latest_notification is not None else model.EvaluationType.undefined
new_state = shipcall.evaluation
# identify, whether the severity of the shipcall has increased to see, whether a notification is required
severity_increase = Notifier.check_higher_severity(old_state=old_state, new_state=new_state)
# when the severity increases, set the 'evaluation_notifications_sent' argument to 0 (False)
if severity_increase:
### UPDATE Shipcall ###
# prepare and create a query
evaluation_notifications_sent = 0
schemaModel = {"id":shipcall.id, "evaluation_notifications_sent":evaluation_notifications_sent} # #TODO: should this require the 'modified' tag to be adapted?
query = SQLQuery.get_shipcall_put(schemaModel)
# execute the PUT-Request
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
### Generate Notifications ###
# find all attached users of the shipcall (checks the history, then reads out the user ids and builds the users)
users = Notifier.get_users_via_history(shipcall_id=shipcall.id)
# for each user, identify the notification_types, which must be generated. Finally, create those
# notifications with a POST-request
Notifier.create_notifications_for_user_list(shipcall, users)
return
@staticmethod
def create_etaetd_string(eta, etd): # #TODO_rename: function name is improvable
eta = eta.strftime("%d.%m.%Y %H:%M") if eta is not None else None
etd = etd.strftime("%d.%m.%Y %H:%M") if etd is not None else None
eta_etd = ""
if eta is not None and etd is not None:
eta_etd = f"{eta} - {etd}"
if eta is not None and etd is None:
eta_etd = f"{eta}"
if etd is None and etd is not None:
eta_etd = f"{etd}"
return eta_etd
@staticmethod
def prepare_notification_body(shipcall:model.Shipcall):
# obtain the respective shipcall and ship
shipcall = execute_sql_query_standalone(query=SQLQuery.get_shipcall_by_id(), model=model.Shipcall, param={"id" : shipcall.id}, command_type="single")
ship = execute_sql_query_standalone(query=SQLQuery.get_ship_by_id(), model=model.Ship, param={"id" : shipcall.ship_id}, command_type="single")
# use ship & shipcall data models to prepare the body
ship_name = ship.name
eta_etd = Notifier.create_etaetd_string(shipcall.eta, shipcall.etd)
eta_etd_type = eta_etd_type_dict[model.ShipcallType(shipcall.type)]
evaluation_message = shipcall.evaluation_message
return (ship_name, evaluation_message, eta_etd, eta_etd_type)
@staticmethod
def shipcall_put_update_evaluation_notifications_sent_flag(shipcall):
# change the 'evaluation_notifications_sent' flag to 1
evaluation_notifications_sent = 1
schemaModel = {"id":shipcall.id, "evaluation_notifications_sent":evaluation_notifications_sent}
query = SQLQuery.get_shipcall_put(schemaModel)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
return
@staticmethod
def build_email_targets_validation_notification(users)->list[str]:
# readout the email address of all users
email_tgts = [user.user_email for user in users if user.user_email is not None]
# additionally, always inform the BSMD
email_tgts.append("bremencalling@bsmd.de") # #TODO: for testing, use "bremencalling@bsmd.de". For live system, use "report@bsmd.de"
# #TODO_development: overwrite the recipients. Only send to 'bremencalling@bsmd.de' until the testing phase has succeeded.
email_tgts = ["bremencalling@bsmd.de" for tgt in email_tgts]
# avoid multi-mails, when (for some reason) multiple users share the same email address.
email_tgts = list(set(email_tgts))
return email_tgts
@staticmethod
def create_and_send_notification_mapper(notification_type:model.NotificationType, shipcall:model.Shipcall, eligible_users:list[model.User], email_handler:EmailHandler, mail_pwd:bytes, debug:bool=False):
# #TODO_refactor: instead create a method, which contains the 'distribution-logic' for all notification types
if int(notification_type)==int(model.NotificationType.email):
# create an Email and send it to each eligible_user.
# #TODO: this method must be a distributor. It should send emails for those, who want emails, and provide placeholders for other types of notifications
Notifier.create_and_send_email_notification(email_handler, mail_pwd, eligible_users, shipcall, debug=debug)
elif int(notification_type)==int(model.NotificationType.undefined): pass
elif int(notification_type)==int(model.NotificationType.push): pass
else: pass
return
@staticmethod
def create_and_send_email_notification(email_handler:EmailHandler, pwd:bytes, users:list[model.User], shipcall:model.Shipcall, debug:bool=False):
"""
# #TODO_rename: when there is more than one type of notification, this should be renamed. This method refers to a validation-state notification
this 'naive' method creates a message and simply sends it to all users in a list of users.
Afterwards, the database will be updated, so the shipcall no longer requires a notification.
"""
# get a list of all recipients
email_tgts = Notifier.build_email_targets_validation_notification(users)
# prepare and build the Email content
content = get_default_html_email()
files = [] # optional attachments
ship_name, evaluation_message, eta_etd, eta_etd_type = Notifier.prepare_notification_body(shipcall)
msg_multipart,msg_content = create_shipcall_evaluation_notification(
email_handler, ship_name, evaluation_message, eta_etd, eta_etd_type, content, files=files
)
# send the messages via smtlib's SSL functions
send_notification(email_handler, email_tgts, msg_multipart, pwd, debug=debug)
return
@staticmethod
def check_user_is_subscribed_to_notification_type(user,notification_type):
"""given a notification, one can check, whether the current user has subscribed to the respective notification_type. Returns a boolean"""
if int(notification_type) == int(model.NotificationType.email):
return user.notify_email
elif int(notification_type) == int(model.NotificationType.push):
return user.notify_popup
elif int(notification_type) == int(model.NotificationType.undefined):
pass
### placeholders:
#elif int(notification_type) == int(model.NotificationType.whatsapp):
#return user.notify_whatsapp
#elif int(notification_type) == int(model.NotificationType.signal):
#return user.notify_signal
else: # placeholder: whatsapp/signal
raise NotImplementedError(notification_type)
@staticmethod
def get_all_notification_types():
from BreCal.schemas import model
return list(model.NotificationType._member_map_.values())
"""# build the list of evaluation times ('now', as isoformat)"""
#evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))]

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -0,0 +1,159 @@
<!doctype html>
<html lang="en">
<head>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Simple Transactional Email</title>
<style media="all" type="text/css">
@media all {
.btn-primary table td:hover {
background-color: #ec0867 !important;
}
.btn-primary a:hover {
background-color: #ec0867 !important;
border-color: #ec0867 !important;
}
}
@media only screen and (max-width: 640px) {
.main p,
.main td,
.main span {
font-size: 16px !important;
}
.wrapper {
padding: 8px !important;
}
.content {
padding: 0 !important;
}
.container {
padding: 0 !important;
padding-top: 8px !important;
width: 100% !important;
}
.main {
border-left-width: 0 !important;
border-radius: 0 !important;
border-right-width: 0 !important;
}
.btn table {
max-width: 100% !important;
width: 100% !important;
}
.btn a {
font-size: 16px !important;
max-width: 100% !important;
width: 100% !important;
}
}
@media all {
.ExternalClass {
width: 100%;
}
.ExternalClass,
.ExternalClass p,
.ExternalClass span,
.ExternalClass font,
.ExternalClass td,
.ExternalClass div {
line-height: 100%;
}
.apple-link a {
color: inherit !important;
font-family: inherit !important;
font-size: inherit !important;
font-weight: inherit !important;
line-height: inherit !important;
text-decoration: none !important;
}
#MessageViewBody a {
color: inherit;
text-decoration: none;
font-size: inherit;
font-family: inherit;
font-weight: inherit;
line-height: inherit;
}
}
</style>
</head>
<body style="font-family: Helvetica, sans-serif; -webkit-font-smoothing: antialiased; font-size: 16px; line-height: 1.3; -ms-text-size-adjust: 100%; -webkit-text-size-adjust: 100%; background-color: #f4f5f6; margin: 0; padding: 0;">
<table role="presentation" border="0" cellpadding="0" cellspacing="0" class="body" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; background-color: #f4f5f6; width: 100%;" width="100%" bgcolor="#f4f5f6">
<tr>
<td style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top;" valign="top">&nbsp;</td>
<td class="container" style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; max-width: 600px; padding: 0; padding-top: 24px; width: 600px; margin: 0 auto;" width="600" valign="top">
<div class="content" style="box-sizing: border-box; display: block; margin: 0 auto; max-width: 600px; padding: 0;">
<!-- START CENTERED WHITE CONTAINER -->
<span class="preheader" style="color: transparent; display: none; height: 0; max-height: 0; max-width: 0; opacity: 0; overflow: hidden; mso-hide: all; visibility: hidden; width: 0;">Ein Schiffsanlauf benötigt Ihre Aufmerksamkeit.</span>
<table role="presentation" border="0" cellpadding="0" cellspacing="0" class="main" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; background: #ffffff; border: 1px solid #eaebed; border-radius: 16px; width: 100%;" width="100%">
<!-- START MAIN CONTENT AREA -->
<div style="text-align: center;">
<img src="cid:LogoBremenCalling" height="100" width="100" alt="Bild kann nicht geladen werden." border="0" align="center">
</div>
<tr>
<td class="wrapper" style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; box-sizing: border-box; padding: 24px;" valign="top">
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;">Ahoi,</p>
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;">ein Schiffsanlauf benötigt Ihre Aufmerksamkeit. Bei der Prüfung der Daten haben wir wahrgenommen, dass ein Problem aufgetreten sein könnte.</p>
<table role="presentation" border="0" cellpadding="0" cellspacing="0" class="btn btn-primary" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; box-sizing: border-box; width: 100%; min-width: 100%;" width="100%">
<tbody>
<tr>
<td align="left" style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; padding-bottom: 16px;" valign="top">
<table role="presentation" border="0" cellpadding="0" cellspacing="0" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; width: auto;">
<tbody>
<tr>
<td style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; border-radius: 4px; text-align: center; background-color: #0867ec;" valign="top" align="center" bgcolor="#0867ec"> <a href="https://bsmd.de/" target="_blank" style="border: solid 2px #0867ec; border-radius: 4px; box-sizing: border-box; cursor: pointer; display: inline-block; font-size: 16px; font-weight: bold; margin: 0; padding: 12px 24px; text-decoration: none; text-transform: capitalize; background-color: #0867ec; border-color: #0867ec; color: #ffffff;">Zu Bremen Calling</a> </td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;">#ADAPTIVECONTENT</p>
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;"><br>Falls es sich hierbei um eine Fehlmeldung handelt, Verzeihung. Wir sind stets interessiert daran, die Software zu verbessern. Senden Sie uns gerne eine <a href="mailto:bsmd@bsmd.de">Email</a>.</p>
</td>
</tr>
<!-- END MAIN CONTENT AREA -->
</table>
<!-- START FOOTER -->
<div class="footer" style="clear: both; padding-top: 24px; text-align: center; width: 100%;">
<table role="presentation" border="0" cellpadding="0" cellspacing="0" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; width: 100%;" width="100%">
<tr>
<td class="content-block" style="font-family: Helvetica, sans-serif; vertical-align: top; color: #9a9ea6; font-size: 12px; text-align: center;" valign="top" align="center">
<span class="apple-link" style="color: #9a9ea6; font-size: 12px; text-align: center;">Bremer Schiffsmeldedienst, Kapt. P. Langbein e.K., Hafenkopf II / Überseetor 20, 28217 Bremen / Germany</span>
<br> Sie möchten keine Benachrichtigungen mehr erhalten? <a href="mailto:bsmd@bsmd.de" style="text-decoration: underline; color: #9a9ea6; font-size: 12px; text-align: center;">Hier abmelden</a>.
</td>
</tr>
<tr>
<td class="content-block powered-by" style="font-family: Helvetica, sans-serif; vertical-align: top; color: #9a9ea6; font-size: 12px; text-align: center;" valign="top" align="center">
Mail-Design by <a href="http://htmlemail.io" style="color: #9a9ea6; font-size:12px; text-align: center; text-decoration: none;">HTMLemail.io</a>
</td>
</tr>
</table>
</div>
<!-- END FOOTER -->
<!-- END CENTERED WHITE CONTAINER --></div>
</td>
<td style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top;" valign="top">&nbsp;</td>
</tr>
</table>
</body>
</html>

View File

@ -1,6 +1,8 @@
import os import os
import typing import typing
import datetime
import smtplib import smtplib
from socket import gaierror
from getpass import getpass from getpass import getpass
from email.message import EmailMessage from email.message import EmailMessage
import mimetypes import mimetypes
@ -42,7 +44,10 @@ class EmailHandler():
self.mail_port = mail_port self.mail_port = mail_port
self.mail_address = mail_address self.mail_address = mail_address
self.server = smtplib.SMTP_SSL(self.mail_server, self.mail_port) # alternatively, use smtplib.SMTP try:
self.server = smtplib.SMTP_SSL(self.mail_server, self.mail_port) # alternatively, use smtplib.SMTP
except gaierror:
raise Exception(f"'socket.gaierror' raised. This commonly happens, when there is no access to the server (e.g., by not having an internet connection)")
def check_state(self): def check_state(self):
"""check, whether the server login took place and is open.""" """check, whether the server login took place and is open."""
@ -226,4 +231,118 @@ class EmailHandler():
time.sleep(delete_after_s_seconds) time.sleep(delete_after_s_seconds)
if os.path.exists(temp_filename): if os.path.exists(temp_filename):
os.remove(temp_filename) os.remove(temp_filename)
return return
import typing
from email.mime.application import MIMEApplication
import mimetypes
import os
def find_warning_notification_email_template()->str:
"""
dynamically finds the 'default_email_template.txt' file within the module.
"""
# __file__ is BreCal/stubs/email_template.py
# parent of email_template.py is stubs
# parent of stubs is BreCal
brecal_root_folder = os.path.dirname(os.path.dirname(__file__)) # .../BreCal
resource_root_folder = os.path.join(brecal_root_folder, "resources") # .../BreCal/resources
html_filepath = os.path.join(resource_root_folder,"warning_notification_email_template.txt") # .../BreCal/resources/warning_notification_email_template.txt
assert os.path.exists(html_filepath), f"could not find default email template file at path: {html_filepath}"
return html_filepath
def get_default_html_email()->str:
"""
dynamically finds the 'default_email_template.txt' file within the module. It opens the file and returns the content.
__file__ returns to the file, where this function is stored (e.g., within BreCal.stubs.email_template)
using the dirname refers to the directory, where __file__ is stored.
finally, the 'default_email_template.txt' is stored within that folder
"""
html_filepath = find_warning_notification_email_template()
with open(html_filepath,"r", encoding="utf-8") as file: # encoding = "utf-8" allows for German Umlaute
content = file.read()
return content
def find_bremen_calling_logo():
"""
find the path towards the logo file (located at 'brecal\src\BreCalClient\Resources\logo_bremen_calling.png')
"""
# __file__ is services/email_handling.py
# parent of __file__ is services
# parent of services is BreCal
src_root_folder = os.path.dirname(os.path.dirname(__file__)) # .../BreCal
resource_root_folder = os.path.join(src_root_folder, "resources")
path = os.path.join(resource_root_folder, "logo_bremen_calling.png")
assert os.path.exists(path), f"cannot find logo of bremen calling at path: {os.path.abspath(path)}"
return path
def add_bremen_calling_logo(msg_multipart):
"""
The image is not attached automatically when it is embedded to the content. To circumvent this,
one commonly creates attachments, which are referred to in the email content.
The content body refers to 'LogoBremenCalling', which the 'Content-ID' of the logo is assigned as.
"""
path = find_bremen_calling_logo()
with open(path, 'rb') as file:
attachment = MIMEApplication(file.read(), _subtype=mimetypes.MimeTypes().guess_type(path), Name="bremen_calling.png")
attachment.add_header('Content-Disposition','attachment',filename=str(os.path.basename(path)))
attachment.add_header('Content-ID', '<LogoBremenCalling>')
msg_multipart.attach(attachment)
return msg_multipart
def create_shipcall_evaluation_notification(email_handler, ship_name:str, evaluation_message:str, eta_etd_str:str, eta_etd_type:str, content:str, files:typing.Optional[list[str]]):
"""
email_handler : EmailHandler. Contains meta-level information about the mail server and sender's Email.
ship_name : str. Name of the referenced ship, so the user knows the context.
evaluation_message : str. Brief description of the current evaluation state
eta_etd_str : str. Readable format of a datetime.datetime object, which is either ETA, ETD or both. Informs the user about when the shipcall is due.
eta_etd_type : str. Reference to the time, whether it arrives/leaves/shifts.
content : str (or filepath). Should refer to the template, which defines the content. This file contains HTML-structured text.
files: (optional). List of file paths, which are included as attachments.
"""
subject = f"{ship_name} (vorauss. {eta_etd_type}: {eta_etd_str})"
# create message_body
message_body = content # "Hello World."
evaluation_message_reformatted = evaluation_message.replace("\n", "<br>")
adaptive_content = f'<br>Betrifft: {ship_name} ({eta_etd_str})<font size="1"><br>{evaluation_message_reformatted}</font>'
message_body = message_body.replace("#ADAPTIVECONTENT", adaptive_content)
msg = email_handler.create_email(subject=subject, message_body=message_body, subtype="html")
msg_multipart = email_handler.translate_mail_to_multipart(msg=msg)
if files is not None:
for path in files:
assert os.path.exists(path), f"cannot find attachment at path: {path}"
email_handler.attach_file(path, msg=msg_multipart)
# add the bremen calling logo, which is referred to in the email body
msg_multipart = add_bremen_calling_logo(msg_multipart)
return (msg_multipart,content)
def send_notification(email_handler, email_tgts, msg, pwd, debug=False):
already_logged_in = email_handler.check_login()
if not already_logged_in:
email_handler.login(interactive=False, pwd=pwd)
try:
assert email_handler.check_login()
if not debug:
email_handler.send_email(msg, email_tgts)
else:
print(f"(send_notification INFO): debugging state. Would have sent an Email to: {email_tgts}")
finally:
if not already_logged_in:
email_handler.close()
return

View File

@ -51,7 +51,7 @@ def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:
schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_) schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_)
return return
def add_function_to_schedule__send_notifications(vr, interval_in_minutes:int=10): def add_function_to_schedule__send_notifications(interval_in_minutes:int=15):
schedule.every(interval_in_minutes).minutes.do(Notifier.send_notifications) schedule.every(interval_in_minutes).minutes.do(Notifier.send_notifications)
return return
@ -65,8 +65,8 @@ def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
# update the evaluation state in every recent shipcall # update the evaluation state in every recent shipcall
add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes) add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes)
# placeholder: create/send notifications # create/send notifications
# add_function_to_schedule__send_notifications(...) add_function_to_schedule__send_notifications(15)
return return

View File

@ -929,6 +929,9 @@ class ValidationRuleFunctions(ValidationRuleBaseFunctions):
query_time = times_agency.iloc[0].eta_berth query_time = times_agency.iloc[0].eta_berth
# count the number of times, where a times entry is very close to the query time (uses an internal threshold, such as 15 minutes) # count the number of times, where a times entry is very close to the query time (uses an internal threshold, such as 15 minutes)
if all_times_agency is None:
all_times_agency = self.sql_handler.get_times_for_agency(non_null_column="eta_berth")
counts = self.sql_handler.count_synchronous_shipcall_times(query_time, all_df_times=all_times_agency) counts = self.sql_handler.count_synchronous_shipcall_times(query_time, all_df_times=all_times_agency)
violation_state = counts > maximum_threshold violation_state = counts > maximum_threshold
@ -952,6 +955,9 @@ class ValidationRuleFunctions(ValidationRuleBaseFunctions):
times_agency = df_times.loc[df_times["participant_type"]==ParticipantType.AGENCY.value] times_agency = df_times.loc[df_times["participant_type"]==ParticipantType.AGENCY.value]
query_time = times_agency.iloc[0].etd_berth query_time = times_agency.iloc[0].etd_berth
if all_times_agency is None:
all_times_agency = self.sql_handler.get_times_for_agency(non_null_column="etd_berth")
# count the number of times, where a times entry is very close to the query time (uses an internal threshold, such as 15 minutes) # count the number of times, where a times entry is very close to the query time (uses an internal threshold, such as 15 minutes)
counts = self.sql_handler.count_synchronous_shipcall_times(query_time, all_df_times=all_times_agency) counts = self.sql_handler.count_synchronous_shipcall_times(query_time, all_df_times=all_times_agency)
violation_state = counts > maximum_threshold violation_state = counts > maximum_threshold

View File

@ -1,3 +1,4 @@
import typing
import copy import copy
import logging import logging
import re import re
@ -74,6 +75,9 @@ class ValidationRules(ValidationRuleFunctions):
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)""" """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]] evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old] evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
evaluation_notifications_sent_old = [ens for ens in shipcall_df.loc[:,"evaluation_notifications_sent"]]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message) results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# unbundle individual results. evaluation_states becomes an integer, violation # unbundle individual results. evaluation_states becomes an integer, violation
@ -82,6 +86,15 @@ class ValidationRules(ValidationRuleFunctions):
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
# build the list of evaluation times ('now', as isoformat) # build the list of evaluation times ('now', as isoformat)
evaluation_time = self.get_notification_times(evaluation_states_new)
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new, evaluation_notifications_sent_old)
shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations
shipcall_df.loc[:,"evaluation_time"] = evaluation_time
shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
#evaluation_time = self.get_notification_times(evaluation_states_new) #evaluation_time = self.get_notification_times(evaluation_states_new)
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
@ -112,16 +125,24 @@ class ValidationRules(ValidationRuleFunctions):
"""this function should apply the ValidationRules to the respective .shipcall, in regards to .times""" """this function should apply the ValidationRules to the respective .shipcall, in regards to .times"""
return (StatusFlags.GREEN, False) # (state:str, should_notify:bool) return (StatusFlags.GREEN, False) # (state:str, should_notify:bool)
def determine_notification_state(self, state_old, state_new): def determine_notification_state(self, state_old, state_new, evaluation_notifications_sent)->typing.Optional[bool]:
""" """
this method determines state changes in the notification state. When the state increases, a user is notified about it. this method determines state changes in the notification state. When the state increases, a user is notified about it.
state order: (NONE = GREEN < YELLOW < RED) state order: (NONE = GREEN < YELLOW < RED)
If a notification shall be sent, this method returns False. If no notification shall be sent, this method returns None or the prior state.
The method *never* returns True, as it shall only be called on novel shipcalls.
args:
evaluation_notifications_sent: the PREVIOUS state (if any) of this boolean. When no notification is required, the prior bool is used (e.g., None, False, True).
""" """
previous_state = evaluation_notifications_sent
# identify a state increase # identify a state increase
should_notify = self.identify_notification_state_change(state_old=state_old, state_new=state_new) should_notify = self.identify_notification_state_change(state_old=state_old, state_new=state_new)
# when a state increases, a notification must be sent. Thereby, the field should be set to False ({evaluation_notifications_sent}) # when a state increases, a notification must be sent. Thereby, the field should be set to False ({evaluation_notifications_sent})
evaluation_notifications_sent = False if bool(should_notify) else None evaluation_notifications_sent = False if bool(should_notify) else previous_state
return evaluation_notifications_sent return evaluation_notifications_sent
def identify_notification_state_change(self, state_old, state_new) -> bool: def identify_notification_state_change(self, state_old, state_new) -> bool:
@ -147,13 +168,13 @@ class ValidationRules(ValidationRuleFunctions):
return int(state_new) > int(state_old) return int(state_new) > int(state_old)
def get_notification_times(self, evaluation_states_new)->list[datetime.datetime]: def get_notification_times(self, evaluation_states_new)->list[datetime.datetime]:
"""# build the list of evaluation times ('now', as isoformat)""" """# build the list of evaluation times ('now'-datetime)"""
evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))] evaluation_times = [datetime.datetime.now() for _i in range(len(evaluation_states_new))] # .isoformat()
return evaluation_times return evaluation_times
def get_notification_states(self, evaluation_states_old, evaluation_states_new)->list[bool]: def get_notification_states(self, evaluation_states_old, evaluation_states_new, evaluation_notifications_sent_old)->list[typing.Optional[bool]]:
"""# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created""" """# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created and None, when not"""
evaluation_notifications_sent = [self.determine_notification_state(state_old=int(state_old), state_new=int(state_new)) for state_old, state_new in zip(evaluation_states_old, evaluation_states_new)] evaluation_notifications_sent = [self.determine_notification_state(state_old=int(state_old), state_new=int(state_new), evaluation_notifications_sent=ens) for state_old, state_new, ens in zip(evaluation_states_old, evaluation_states_new, evaluation_notifications_sent_old)]
return evaluation_notifications_sent return evaluation_notifications_sent

View File

@ -0,0 +1,21 @@
import pytest
from BreCal.notifications.accounts import mail_server, mail_port, mail_address, mail_pwd
def test_mail_server():
assert isinstance(mail_server, str)
assert not "@" in mail_server
return
def test_mail_port():
assert isinstance(mail_port, int)
return
def test_mail_address():
assert isinstance(mail_address, str)
assert "@" in mail_address
return
def test_mail_pwd():
assert isinstance(mail_pwd, bytes), f"must be a bytes-encoded password to protect the account"
return

View File

View File

@ -0,0 +1,14 @@
import pytest
import os
def test_find_bremen_calling_logo():
from BreCal.services.email_handling import find_bremen_calling_logo
path = find_bremen_calling_logo()
assert os.path.exists(path), f"cannot find the bremen calling logo file, which is needed for notifications (e.g., Email). Searched at path: \n\t{path}"
return
def test_find_warning_notification_email_template():
from BreCal.services.email_handling import find_warning_notification_email_template
path = find_warning_notification_email_template()
assert os.path.exists(path), f"cannot find the required email template, which is needed for warning notifications. Searched at path: \n\t{path}"
return