This commit is contained in:
scopesorting 2024-08-01 18:19:23 +02:00 committed by GitHub
commit 79ebf474fd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 1081 additions and 38 deletions

View File

@ -11,6 +11,10 @@ bp = Blueprint('user', __name__)
@bp.route('/user', methods=['put'])
@auth_guard() # no restriction by role
def PutUser():
# #TODO: user validation should be extended by the notifications. When someone wants to set
# notify_email = 1, the email must be either present or part of the loadedModel
# notify_whatsapp = 1, there must be a phone number (same for notify_signal)
# notify_push = 1, there must be a phone number (#TODO_determine ... or an app-id? Unclear still)
try:
content = request.get_json(force=True)

View File

@ -33,7 +33,8 @@ def get_synchronous_shipcall_times_standalone(query_time:pd.Timestamp, all_df_ti
returns: counts
"""
assert isinstance(query_time,pd.Timestamp)
assert isinstance(query_time,pd.Timestamp) or pd.isnull(query_time), f"expected query_time to be a pd.Timestamp or pd.NaT. Found: {type(query_time)}"
assert isinstance(all_df_times,pd.DataFrame)
# get a timedelta for each valid (not Null) time entry
time_deltas_eta = [(query_time.to_pydatetime()-time_.to_pydatetime()) for time_ in all_df_times.loc[:,"eta_berth"] if not pd.isnull(time_)]
@ -439,4 +440,5 @@ class SQLHandler():
def count_synchronous_shipcall_times(self, query_time:pd.Timestamp, all_df_times:pd.DataFrame, delta_threshold=900)->int:
"""count all times entries, which are too close to the query_time. The {delta_threshold} determines the threshold. returns counts (int)"""
assert isinstance(all_df_times, pd.DataFrame)
return get_synchronous_shipcall_times_standalone(query_time, all_df_times, delta_threshold)

View File

@ -213,7 +213,7 @@ class SQLQuery():
@staticmethod
def get_history()->str:
query = "SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?"
query = "SELECT id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?"
return query
@staticmethod
@ -222,7 +222,7 @@ class SQLQuery():
"api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +\
"WHERE user_name = ?username? OR user_email = ?username?"
return query
@staticmethod
def get_notifications()->str:
query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \
@ -266,6 +266,11 @@ class SQLQuery():
query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name"
return query
@staticmethod
def get_ship_by_id()->str:
query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship WHERE id = ?id?"
return query
@staticmethod
def get_times()->str:
query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \
@ -321,6 +326,22 @@ class SQLQuery():
query = prefix+stage1+bridge+stage2+suffix
return query
@staticmethod
def get_notifications_post(schemaModel:dict)->str:
param_keys = {key:key for key in schemaModel.keys()}
prefix = "INSERT INTO notification ("
bridge = ") VALUES ("
suffix = ")"
non_dynamic_keys = ["id", "created", "modified"]
stage1 = ",".join([key for key in schemaModel.keys() if not key in non_dynamic_keys])
stage2 = ",".join([f"?{param_keys.get(key)}?" for key in schemaModel.keys() if not key in non_dynamic_keys])
query = prefix+stage1+bridge+stage2+suffix
return query
@staticmethod
def get_last_insert_id()->str:
@ -412,14 +433,14 @@ class SQLQuery():
@staticmethod
def get_notification_post()->str:
raise NotImplementedError()
raise NotImplementedError("skeleton")
# #TODO: this query is wrong and just a proxy for a POST request
query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
return query
@staticmethod
def get_shipcall_put_notification_state()->str:
raise NotImplementedError()
raise NotImplementedError("skeleton")
# #TODO: use evaluation_notifications_sent here and consider only the shipcall_id
# #TODO: query
query = ...

View File

@ -23,7 +23,7 @@ def GetHistory(options):
if "shipcall_id" in options and options["shipcall_id"]:
# query = SQLQuery.get_history()
# data = commands.query(query, model=History.from_query_row, param={"shipcallid" : options["shipcall_id"]})
data = commands.query("SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?",
data = commands.query("SELECT id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?",
model=History.from_query_row,
param={"shipcallid" : options["shipcall_id"]})

View File

@ -0,0 +1,7 @@
"""This file contains login information to register into distinct notification accounts."""
mail_server = 'w01d5503.kasserver.com'
mail_port=465
mail_address="max.metz@scope-sorting.com"
mail_pwd = b'gAAAAABmqJlkXbtJTL1tFiyQNHhF_Y7sgtVI0xEx07ybwbX70Ro1Vp73CLDq49eFSYG-1SswIDQ2JBSORYlWaR-Vh2kIwPHy_lX8SxkySrRvBRzkyZP5x0I='

View File

@ -0,0 +1,53 @@
import pandas as pd
from server.BreCal.database.enums import ParticipantType, ShipcallType, StatusFlags
#### Verbosity Functions ####
def get_default_header()->str:
# HEADER (greeting and default message)
header = "Dear Sir or Madam\n\nThank you for participating in the project 'Bremen Calling'. During analysis, our software has identified an event, which may be worth a second look. Here is the summary. \n\n"
return header
def get_default_footer()->str:
# FOOTER (signature)
footer = "\n\nWe would kindly ask you to have a look at the shipcall and verify, if any action is required from your side. \n\nKind regards\nThe 'Bremen Calling' Team"
return footer
def get_agency_name(sql_handler, times_df):
times_agency = times_df.loc[times_df["participant_type"]==ParticipantType.AGENCY.value,"participant_id"]
if len(times_agency)==0:
agency_name = ""
else:
agency_participant_id = times_agency.iloc[0]
agency_name = sql_handler.df_dict.get("participant").loc[agency_participant_id,"name"]
return agency_name
def get_ship_name(sql_handler, shipcall):
ship = sql_handler.df_dict.get("ship").loc[shipcall.ship_id]
ship_name = ship.loc["name"] # when calling ship.name, the ID is returned (pandas syntax)
return ship_name
def create_notification_body(sql_handler, times_df, shipcall, result)->str:
# #TODO: add 'Link zum Anlauf'
# URL: https://trello.com/c/qenZyJxR/75-als-bsmd-m%C3%B6chte-ich-%C3%BCber-gelbe-und-rote-ampeln-informiert-werden-um-die-systembeteiligung-zu-st%C3%A4rken
header = get_default_header()
footer = get_default_footer()
agency_name = get_agency_name(sql_handler, times_df)
ship_name = get_ship_name(sql_handler, shipcall)
verbosity_introduction = f"Respective Shipcall:\n"
traffic_state_verbosity = f"\tTraffic Light State: {StatusFlags(result[0]).name}\n"
ship_name_verbosity = f"\tShip: {ship_name} (the ship is {ShipcallType(shipcall.type).name.lower()})\n"
agency_name_verbosity = f"\tResponsible Agency: {agency_name}\n"
eta_verbosity = f"\tEstimated Arrival Time: {shipcall.eta.isoformat()}\n" if not pd.isna(shipcall.eta) else ""
etd_verbosity = f"\tEstimated Departure Time: {shipcall.etd.isoformat()}\n" if not pd.isna(shipcall.etd) else ""
error_verbosity = f"\nError Description:\n\t" + "\n\t".join(result[1])
message_body = "".join([header, verbosity_introduction, traffic_state_verbosity, ship_name_verbosity, agency_name_verbosity, eta_verbosity, etd_verbosity, error_verbosity, footer])
return message_body

View File

@ -0,0 +1,13 @@
import datetime
from BreCal.schemas.model import Notification
from BreCal.database.enums import NotificationType, StatusFlags
def create_notification(id, times_id, message, level, notification_type:NotificationType, created=None, modified=None):
# #TODO_determine: determine, whether this function is still in active use. The data-model seems outdated.
created = (datetime.datetime.now()).isoformat() or created
notification = Notification(
id=id,
times_id=times_id, acknowledged=False, level=level, type=notification_type.value, message=message, created=created, modified=modified
)
return notification

View File

@ -0,0 +1,535 @@
import typing
import datetime
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model
from BreCal.brecal_utils.time_handling import difference_to_then
from BreCal.notifications.accounts import mail_server, mail_port, mail_address, mail_pwd
from BreCal.services.email_handling import EmailHandler, create_shipcall_evaluation_notification, send_notification, get_default_html_email
from BreCal.database.enums import ParticipantwiseTimeDelta
eta_etd_type_dict = {
model.ShipcallType.arrival : "Ankunft",
model.ShipcallType.departure : "Abfahrt",
model.ShipcallType.shifting : "Wechselnd"
}
class Notifier():
"""
This class provides quick access to different notification functions.
Each method is callable without initializing the Notifier object.
Example:
Notifier.send_notifications(*args)
The Notifier has three main methods.
Notifier.send_notifications() --- can be called routinely. Identifies all candidates and notifies the users
Notifier.send_notification(shipcall) --- applies filters to identify, whether a notification is desired. If so, notifies the users
Notifer.create(user) --- 'naive' method, which simply creates a message and sends it to the user's preferred choice
# #TODO_determine: it makes sense to go one step finer. .create could produce messages and recipients, whereas .publish may then issue those and .document may update the SQL dataset
## naming idea: Notifier.send_notifications, Notifier.send_notification, Notifier.send (which may contain .create, .publish, .document)
"""
def __init__(self) -> None:
pass
@staticmethod
def send_notifications(is_test:bool=False) -> None:
"""
This method is used in BreCal.services.schedule_routines and will issue notifications, once they are due.
It is purposely defined in a way, where no external dependencies or arguments are required. The only exception is the
'is_test' boolean, as it prevents the notifications from being *actually* sent as part of the pytests.
Steps:
- get all shipcalls
- filter: consider only those, which are not yet sent (uses shipcall.evaluation_notifications_sent)
- iterate over each remaining shipcall and apply .send_notification
- those which are unsent, shall be sent by the respective type
"""
# set a threshold, when alarm event notifications become eligible
time_diff_threshold = float(ParticipantwiseTimeDelta.NOTIFICATION)*60 # m minutes, converted to seconds
debug = is_test # if is_test, the Emails will not be issued. Only a print message will be created.
update_database = True if not is_test else False # if_test, the database will not be updated.
time_diff_threshold = time_diff_threshold if not is_test else 0.0 # 0.0 delay when is_test is set.
email_handler = EmailHandler(mail_server=mail_server, mail_port=mail_port, mail_address=mail_address)
# get candidates: find all eligible shipcalls, where the evaluation state is yellow or red & the notifications are not yet sent
eligible_shipcalls = Notifier.get_eligible_shipcalls(time_diff_threshold)
# get a list of tuples, where (shipcall, users) are combined for easier iterations.
notification_instructions = Notifier.build_notification_tuples(eligible_shipcalls)
if len(notification_instructions)>0: # only perform a login when there are any notification_instructions
try:
# login in advance, so the email handler uses a shared connection. It disconnects only once at the end of the call.
email_handler.login(interactive=False, pwd=mail_pwd)
for shipcall, users in notification_instructions:
assert isinstance(shipcall, model.Shipcall)
assert isinstance(users,list)
assert all([isinstance(user,model.User) for user in users])
# iterate over each notification type
for notification_type in Notifier.get_all_notification_types():
# consider only those users, which have subscribed to the respective notification type
eligible_users = Notifier.get_eligible_users(users, notification_type)
if len(eligible_users)>0:
Notifier.create_and_send_notification_mapper(notification_type, shipcall, eligible_users, email_handler, mail_pwd, debug=debug)
if not debug:
# populate notifications within the database to keep track
Notifier.generate_notification(shipcall, notification_type)
# update the database entries, so notifications are only sent once.
if update_database:
Notifier.shipcall_put_update_evaluation_notifications_sent_flag(shipcall)
# #TODO: except... create log?
finally:
email_handler.close()
return
@staticmethod
def build_notification_tuples(eligible_shipcalls)->list[typing.Tuple[model.Shipcall,list[model.User]]]:
"""
creates tuples, where shipcall and the list of attached users are grouped. One can iterate over the tuples
to perform actions, such as issueing notifications
"""
notification_instructions = []
for shipcall in eligible_shipcalls:
# get all users, which are attached to the shipcall (uses the History dataset)
users = Notifier.get_users_via_history(shipcall.id)
# create and store tuples of (shipcall, users) for each eligible shipcall
notification_instructions.append((shipcall, users))
return notification_instructions
@staticmethod
def get_eligible_users(users, notification_type):
eligible_users = [user for user in users if Notifier.check_user_is_subscribed_to_notification_type(user,notification_type=notification_type)]
# filter: consider only those users, where an Email is set
# #TODO: this is Email-specific and should not be a filter for other notifications
eligible_users = [user for user in eligible_users if user.user_email is not None]
return eligible_users
@staticmethod
def create(shipcall_id, old_state, new_state, user, update_database:bool=False)->typing.Optional[model.Notification]:
"""
# #TODO_refactor: drastically change this method. It should only generate notifications, but not send them.
Standalone function, which creates a Notification for a specific user.
Steps:
- identify a list of notification_types, which shall be issued (based on the user's 'notify_*' settings)
- create messages based on the respective NotificationType, which the user has enabled
- send the messages
- update the shipcall dataset ('evaluation_notifications_sent')
args:
update_database: whether to update the MySQL database by posting the notification.
"""
assert user.id is not None
assert shipcall_id is not None
assert old_state is not None
assert new_state is not None
# get Shipcall by shipcall_id
shipcall = Notifier.get_shipcall(shipcall_id=shipcall_id)
"""
## TODO: this might simply be removed due to incorrect concept
## could also relocate this to the generation function, which identifies the notifications to be created
## should be unnecessary due to shipcall.evaluation_notifications_sent
# a) filter existing notifictions and consider only the dataset, where type (notification_type) and level (new_state) are suitable
notification_exists = Notifier.check_notification_type_and_level_exists(shipcall_id=shipcall_id, notification_type=notification_type, level=new_state, existing_notifications=existing_notifications)
if notification_exists:
return None
"""
# get a list of all subscribed notification types and track the state (success or failure)
raise NotImplementedError("skeleton")
successes = {}
notification_type_list = Notifier.build_notification_type_list(user)
for notification_type in notification_type_list:
# generate message based on the notification type
message = Notifier.generate_notification_message_by_type(notification_type, evaluation_message=shipcall.evaluation_message, user=user)
# send the message
success_state = Notifier.send_notification_by_type(notification_type, message)
successes[notification_type] = success_state
notification = ...
return notification
@staticmethod
def find_latest_notification(notifications:list[model.Notification])->typing.Optional[model.Notification]:
"""given a list of notification objects, this method returns the object, where the .created field corresponds to the *latest* notification object"""
latest_notification = sorted(notifications, key=lambda notification: notification.created, reverse=False)[-1] if len(notifications)>0 else None
return latest_notification
@staticmethod
def get_users_via_history(shipcall_id:int)->list[model.User]:
"""using the History objects, one can infer the user_id, which allows querying the Users"""
histories = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : shipcall_id}, model=model.History, command_type="query")
assert isinstance(histories,list)
assert all([isinstance(history,model.History) for history in histories])
users = [Notifier.get_user(history.user_id) for history in histories]
return users
@staticmethod
def get_user(user_id:int)->model.User:
"""Given a user_id, this method executes an SQL query to return a User"""
user = execute_sql_query_standalone(query=SQLQuery.get_user_by_id(), param={"id" : user_id}, model=model.User, command_type="single")
return user
@staticmethod
def get_shipcall(shipcall_id:int)->model.Shipcall:
"""Given a shipcall_id, this method executes an SQL query to return a Shipcall"""
shipcall = execute_sql_query_standalone(query=SQLQuery.get_shipcall_by_id(), param={"id" : shipcall_id}, model=model.Shipcall.from_query_row, command_type="single")
return shipcall
@staticmethod
def get_existing_notifications(shipcall_id:int)->list[model.Notification]:
existing_notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : shipcall_id}, model=model.Notification, command_type="query")
return existing_notifications
@staticmethod
def build_notification_type_list(user:model.User)->list[model.NotificationType]:
"""
based on a User, this method generates a list of notification types. These can be used as instructions to
generate the respective Notification datasets.
"""
notification_type_list = []
if user.notify_email:
notification_type_list.append(model.NotificationType.email)
if user.notify_popup:
notification_type_list.append(model.NotificationType.push)
if user.notify_whatsapp:
# currently not defined as a data model. Must be included / changed, once the data model of NotificationType is updated
notification_type_list.append(model.NotificationType.undefined)
if user.notify_signal:
# currently not defined as a data model. Must be included / changed, once the data model of NotificationType is updated
notification_type_list.append(model.NotificationType.undefined)
return notification_type_list
@staticmethod
def check_notification_type_and_level_exists(shipcall_id:int, notification_type:model.NotificationType, level:model.EvaluationType, existing_notifications:list[model.Notification])->bool:
"""This method checks, whether one of the Notification elements in the provided list is a perfect match to the arguments shipcall_id, notification_type and level"""
# #TODO_determine: should a notification be *skipped*, when there already is a dataset with
# identical level and type? ---> currently enabled.
# check, if any of the existing notifications is a perfect match for notification type & level & shipcall
matches = [note for note in existing_notifications if (int(level)==int(note.level)) and (int(note.type)==int(notification_type)) and (shipcall_id==note.shipcall_id)]
# bool: whether there is a perfect match
exists = len(matches)>0
raise Exception("deprecated")
return exists
@staticmethod
def check_higher_severity(old_state:model.EvaluationType, new_state:model.EvaluationType)->bool:
"""
determines, whether the observed state change should trigger a notification.
internally, this function maps StatusFlags to an integer and determines, if the successor state is more severe than the predecessor.
state changes trigger a notification in the following cases:
green -> yellow
green -> red
yellow -> red
(none -> yellow) or (none -> red)
due to the values in the enumeration objects, the states are mapped to provide this function.
green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
returns bool, whether a notification should be triggered
"""
# undefined previous state: .undefined (0)
if old_state is None:
old_state = model.EvaluationType.undefined
# old_state is always considered at least .green (1) (hence, .undefined becomes .green)
old_state = max(int(old_state), model.EvaluationType.green)
# the IntEnum values are correctly sequenced. .red > .yellow > .green > .undefined
# as .undefined becomes .green, an old_state is always *at least* green.
severity_grew = int(new_state) > int(old_state)
return severity_grew
@staticmethod
def generate_notification_message_by_type(notification_type:model.NotificationType, evaluation_message:str, user:model.User):
assert isinstance(user, model.User)
if int(notification_type) == int(model.NotificationType.undefined):
raise NotImplementedError("skeleton")
elif int(notification_type) == int(model.NotificationType.email):
raise NotImplementedError("skeleton")
elif int(notification_type) == int(model.NotificationType.push):
raise NotImplementedError("skeleton")
#elif int(notification_type) == int(model.NotificationType.whatsapp):
#raise NotImplementedError("skeleton")
#elif int(notification_type) == int(model.NotificationType.signal):
#raise NotImplementedError("skeleton")
elif int(notification_type) == int(model.NotificationType.undefined):
raise NotImplementedError("skeleton")
else:
raise ValueError(notification_type)
return
@staticmethod
def get_eligible_shipcalls(time_diff_threshold:float):
"""
get all eligible shipcalls, which do not have a sent notification yet
criterion a) notification shall not be sent yet (evaluation_notifications_sent = 0)
criterion b) evaluation state is yellow or red (type 2 or 3)
"""
query = 'SELECT * FROM shipcall WHERE (evaluation_notifications_sent = ?evaluation_notifications_sent?) AND (evaluation = 2 OR evaluation = 3)'
evaluation_notifications_sent = 0
# filter out the shipcalls, where the notifications were already sent (ignores null values, which is as expected)
eligible_shipcalls = execute_sql_query_standalone(query=query, model=model.Shipcall, param={"evaluation_notifications_sent" : evaluation_notifications_sent})
# filter out the shipcalls, where evaluation_time is not set
eligible_shipcalls = [shipcall for shipcall in eligible_shipcalls if shipcall.evaluation_time is not None]
# filter out the shipcalls, where the evaluation_time is too recent. We expect a minimum difference, so user input errors do not cause notifications
eligible_shipcalls = [shipcall for shipcall in eligible_shipcalls if Notifier.check_shipcall_evaluation_time_exceeds_minimum_time_difference(shipcall, time_diff_threshold)]
return eligible_shipcalls
@staticmethod
def get_eligible_notifications_of_shipcall(shipcall:model.Shipcall, time_diff_threshold:float)->list[model.Notification]:
"""obtain all notifications, which belong to the shipcall id"""
query = SQLQuery.get_notifications()
eligible_notifications = execute_sql_query_standalone(query=query, model=model.Notification, param={"scid" : shipcall.id})
eligible_notifications = [notification for notification in eligible_notifications if Notifier.check_notification_level_matches_shipcall_entry(notification, shipcall)]
return eligible_notifications
@staticmethod
def check_shipcall_evaluation_time_exceeds_minimum_time_difference(shipcall:model.Shipcall, time_diff_threshold:float):
"""
a notification may only be sent, when the created notification has been created or modified {time_diff_threshold} seconds ago.
"""
assert (shipcall.evaluation_time is not None), f"must provide 'evaluation_time'"
return difference_to_then(shipcall.evaluation_time)>=time_diff_threshold
@staticmethod
def check_notification_level_matches_shipcall_entry(notification, shipcall):
"""
a notification may only be sent, when the shipcall entry matches the notification level.
otherwise, a user may have adapted the shipcall in the mean-time, so a notification would no longer be useful.
"""
return int(shipcall.evaluation) == int(notification.level)
@staticmethod
def get_eligible_notifications(shipcalls:list[model.Shipcall]):
"""obtain a list of all notifications of each element of the shipcall list."""
eligible_notifications = []
for shipcall in shipcalls:
eligible_notification = Notifier.get_eligible_notifications_of_shipcall(shipcall)
eligible_notifications.extend(eligible_notification)
raise NotImplementedError("refactoring!")
return eligible_notifications
@staticmethod
def create_notifications_for_user_list(shipcall, users:list[model.User]):
raise NotImplementedError("deprecated")
notification_type_list = []
for user in users:
user_notification_type_list = Notifier.build_notification_type_list(user)
notification_type_list.extend(user_notification_type_list)
# get the unique notification types
notification_type_list = list(set(notification_type_list))
for notification_type in notification_type_list:
schemaModel = dict(shipcall_id = shipcall.id, level = int(shipcall.evaluation), type = notification_type, message = "", created = datetime.datetime.now(), modified=None)
query = SQLQuery.get_notifications_post(schemaModel)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
return
@staticmethod
def generate_notification(shipcall:model.Shipcall, notification_type:model.NotificationType):
"""
This one-line method creates a notification for the provided shipcall and notification type
"""
schemaModel = dict(
shipcall_id = shipcall.id,
level = int(shipcall.evaluation),
type = notification_type,
message = "", # #TODO_messsage: what should be stored here? The HTML-template appears to be too long.
created = datetime.datetime.now(),
modified=None)
query = SQLQuery.get_notifications_post(schemaModel)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
return
@staticmethod
def DEPRECATED_generate_notifications(shipcall_id):
"""
# #TODO_delete: this method can be safely removed after 15.08.2024
This one-line method creates all notifications for the provided shipcall id. It does so by obtaining the shipcall,
looking up its history, and finding all attached users.
For each user, a notification will be created for each subscribed notification type (e.g., Email)
"""
shipcall = Notifier.get_shipcall(shipcall_id)
notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : shipcall_id}, model=model.Notification, command_type="query")
latest_notification = Notifier.find_latest_notification(notifications)
old_state = model.EvaluationType(latest_notification.level) if latest_notification is not None else model.EvaluationType.undefined
new_state = shipcall.evaluation
# identify, whether the severity of the shipcall has increased to see, whether a notification is required
severity_increase = Notifier.check_higher_severity(old_state=old_state, new_state=new_state)
# when the severity increases, set the 'evaluation_notifications_sent' argument to 0 (False)
if severity_increase:
### UPDATE Shipcall ###
# prepare and create a query
evaluation_notifications_sent = 0
schemaModel = {"id":shipcall.id, "evaluation_notifications_sent":evaluation_notifications_sent} # #TODO: should this require the 'modified' tag to be adapted?
query = SQLQuery.get_shipcall_put(schemaModel)
# execute the PUT-Request
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
### Generate Notifications ###
# find all attached users of the shipcall (checks the history, then reads out the user ids and builds the users)
users = Notifier.get_users_via_history(shipcall_id=shipcall.id)
# for each user, identify the notification_types, which must be generated. Finally, create those
# notifications with a POST-request
Notifier.create_notifications_for_user_list(shipcall, users)
return
@staticmethod
def create_etaetd_string(eta, etd): # #TODO_rename: function name is improvable
eta = eta.strftime("%d.%m.%Y %H:%M") if eta is not None else None
etd = etd.strftime("%d.%m.%Y %H:%M") if etd is not None else None
eta_etd = ""
if eta is not None and etd is not None:
eta_etd = f"{eta} - {etd}"
if eta is not None and etd is None:
eta_etd = f"{eta}"
if etd is None and etd is not None:
eta_etd = f"{etd}"
return eta_etd
@staticmethod
def prepare_notification_body(shipcall:model.Shipcall):
# obtain the respective shipcall and ship
shipcall = execute_sql_query_standalone(query=SQLQuery.get_shipcall_by_id(), model=model.Shipcall, param={"id" : shipcall.id}, command_type="single")
ship = execute_sql_query_standalone(query=SQLQuery.get_ship_by_id(), model=model.Ship, param={"id" : shipcall.ship_id}, command_type="single")
# use ship & shipcall data models to prepare the body
ship_name = ship.name
eta_etd = Notifier.create_etaetd_string(shipcall.eta, shipcall.etd)
eta_etd_type = eta_etd_type_dict[model.ShipcallType(shipcall.type)]
evaluation_message = shipcall.evaluation_message
return (ship_name, evaluation_message, eta_etd, eta_etd_type)
@staticmethod
def shipcall_put_update_evaluation_notifications_sent_flag(shipcall):
# change the 'evaluation_notifications_sent' flag to 1
evaluation_notifications_sent = 1
schemaModel = {"id":shipcall.id, "evaluation_notifications_sent":evaluation_notifications_sent}
query = SQLQuery.get_shipcall_put(schemaModel)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
return
@staticmethod
def build_email_targets_validation_notification(users)->list[str]:
# readout the email address of all users
email_tgts = [user.user_email for user in users if user.user_email is not None]
# additionally, always inform the BSMD
email_tgts.append("bremencalling@bsmd.de") # #TODO: for testing, use "bremencalling@bsmd.de". For live system, use "report@bsmd.de"
# #TODO_development: overwrite the recipients. Only send to 'bremencalling@bsmd.de' until the testing phase has succeeded.
email_tgts = ["bremencalling@bsmd.de" for tgt in email_tgts]
# avoid multi-mails, when (for some reason) multiple users share the same email address.
email_tgts = list(set(email_tgts))
return email_tgts
@staticmethod
def create_and_send_notification_mapper(notification_type:model.NotificationType, shipcall:model.Shipcall, eligible_users:list[model.User], email_handler:EmailHandler, mail_pwd:bytes, debug:bool=False):
# #TODO_refactor: instead create a method, which contains the 'distribution-logic' for all notification types
if int(notification_type)==int(model.NotificationType.email):
# create an Email and send it to each eligible_user.
# #TODO: this method must be a distributor. It should send emails for those, who want emails, and provide placeholders for other types of notifications
Notifier.create_and_send_email_notification(email_handler, mail_pwd, eligible_users, shipcall, debug=debug)
elif int(notification_type)==int(model.NotificationType.undefined): pass
elif int(notification_type)==int(model.NotificationType.push): pass
else: pass
return
@staticmethod
def create_and_send_email_notification(email_handler:EmailHandler, pwd:bytes, users:list[model.User], shipcall:model.Shipcall, debug:bool=False):
"""
# #TODO_rename: when there is more than one type of notification, this should be renamed. This method refers to a validation-state notification
this 'naive' method creates a message and simply sends it to all users in a list of users.
Afterwards, the database will be updated, so the shipcall no longer requires a notification.
"""
# get a list of all recipients
email_tgts = Notifier.build_email_targets_validation_notification(users)
# prepare and build the Email content
content = get_default_html_email()
files = [] # optional attachments
ship_name, evaluation_message, eta_etd, eta_etd_type = Notifier.prepare_notification_body(shipcall)
msg_multipart,msg_content = create_shipcall_evaluation_notification(
email_handler, ship_name, evaluation_message, eta_etd, eta_etd_type, content, files=files
)
# send the messages via smtlib's SSL functions
send_notification(email_handler, email_tgts, msg_multipart, pwd, debug=debug)
return
@staticmethod
def check_user_is_subscribed_to_notification_type(user,notification_type):
"""given a notification, one can check, whether the current user has subscribed to the respective notification_type. Returns a boolean"""
if int(notification_type) == int(model.NotificationType.email):
return user.notify_email
elif int(notification_type) == int(model.NotificationType.push):
return user.notify_popup
elif int(notification_type) == int(model.NotificationType.undefined):
pass
### placeholders:
#elif int(notification_type) == int(model.NotificationType.whatsapp):
#return user.notify_whatsapp
#elif int(notification_type) == int(model.NotificationType.signal):
#return user.notify_signal
else: # placeholder: whatsapp/signal
raise NotImplementedError(notification_type)
@staticmethod
def get_all_notification_types():
from BreCal.schemas import model
return list(model.NotificationType._member_map_.values())

View File

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -0,0 +1,159 @@
<!doctype html>
<html lang="en">
<head>
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>Simple Transactional Email</title>
<style media="all" type="text/css">
@media all {
.btn-primary table td:hover {
background-color: #ec0867 !important;
}
.btn-primary a:hover {
background-color: #ec0867 !important;
border-color: #ec0867 !important;
}
}
@media only screen and (max-width: 640px) {
.main p,
.main td,
.main span {
font-size: 16px !important;
}
.wrapper {
padding: 8px !important;
}
.content {
padding: 0 !important;
}
.container {
padding: 0 !important;
padding-top: 8px !important;
width: 100% !important;
}
.main {
border-left-width: 0 !important;
border-radius: 0 !important;
border-right-width: 0 !important;
}
.btn table {
max-width: 100% !important;
width: 100% !important;
}
.btn a {
font-size: 16px !important;
max-width: 100% !important;
width: 100% !important;
}
}
@media all {
.ExternalClass {
width: 100%;
}
.ExternalClass,
.ExternalClass p,
.ExternalClass span,
.ExternalClass font,
.ExternalClass td,
.ExternalClass div {
line-height: 100%;
}
.apple-link a {
color: inherit !important;
font-family: inherit !important;
font-size: inherit !important;
font-weight: inherit !important;
line-height: inherit !important;
text-decoration: none !important;
}
#MessageViewBody a {
color: inherit;
text-decoration: none;
font-size: inherit;
font-family: inherit;
font-weight: inherit;
line-height: inherit;
}
}
</style>
</head>
<body style="font-family: Helvetica, sans-serif; -webkit-font-smoothing: antialiased; font-size: 16px; line-height: 1.3; -ms-text-size-adjust: 100%; -webkit-text-size-adjust: 100%; background-color: #f4f5f6; margin: 0; padding: 0;">
<table role="presentation" border="0" cellpadding="0" cellspacing="0" class="body" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; background-color: #f4f5f6; width: 100%;" width="100%" bgcolor="#f4f5f6">
<tr>
<td style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top;" valign="top">&nbsp;</td>
<td class="container" style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; max-width: 600px; padding: 0; padding-top: 24px; width: 600px; margin: 0 auto;" width="600" valign="top">
<div class="content" style="box-sizing: border-box; display: block; margin: 0 auto; max-width: 600px; padding: 0;">
<!-- START CENTERED WHITE CONTAINER -->
<span class="preheader" style="color: transparent; display: none; height: 0; max-height: 0; max-width: 0; opacity: 0; overflow: hidden; mso-hide: all; visibility: hidden; width: 0;">Ein Schiffsanlauf benötigt Ihre Aufmerksamkeit.</span>
<table role="presentation" border="0" cellpadding="0" cellspacing="0" class="main" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; background: #ffffff; border: 1px solid #eaebed; border-radius: 16px; width: 100%;" width="100%">
<!-- START MAIN CONTENT AREA -->
<div style="text-align: center;">
<img src="cid:LogoBremenCalling" height="100" width="100" alt="Bild kann nicht geladen werden." border="0" align="center">
</div>
<tr>
<td class="wrapper" style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; box-sizing: border-box; padding: 24px;" valign="top">
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;">Ahoi,</p>
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;">ein Schiffsanlauf benötigt Ihre Aufmerksamkeit. Bei der Prüfung der Daten haben wir wahrgenommen, dass ein Problem aufgetreten sein könnte.</p>
<table role="presentation" border="0" cellpadding="0" cellspacing="0" class="btn btn-primary" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; box-sizing: border-box; width: 100%; min-width: 100%;" width="100%">
<tbody>
<tr>
<td align="left" style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; padding-bottom: 16px;" valign="top">
<table role="presentation" border="0" cellpadding="0" cellspacing="0" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; width: auto;">
<tbody>
<tr>
<td style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top; border-radius: 4px; text-align: center; background-color: #0867ec;" valign="top" align="center" bgcolor="#0867ec"> <a href="https://bsmd.de/" target="_blank" style="border: solid 2px #0867ec; border-radius: 4px; box-sizing: border-box; cursor: pointer; display: inline-block; font-size: 16px; font-weight: bold; margin: 0; padding: 12px 24px; text-decoration: none; text-transform: capitalize; background-color: #0867ec; border-color: #0867ec; color: #ffffff;">Zu Bremen Calling</a> </td>
</tr>
</tbody>
</table>
</td>
</tr>
</tbody>
</table>
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;">#ADAPTIVECONTENT</p>
<p style="font-family: Helvetica, sans-serif; font-size: 16px; font-weight: normal; margin: 0; margin-bottom: 16px;"><br>Falls es sich hierbei um eine Fehlmeldung handelt, Verzeihung. Wir sind stets interessiert daran, die Software zu verbessern. Senden Sie uns gerne eine <a href="mailto:bsmd@bsmd.de">Email</a>.</p>
</td>
</tr>
<!-- END MAIN CONTENT AREA -->
</table>
<!-- START FOOTER -->
<div class="footer" style="clear: both; padding-top: 24px; text-align: center; width: 100%;">
<table role="presentation" border="0" cellpadding="0" cellspacing="0" style="border-collapse: separate; mso-table-lspace: 0pt; mso-table-rspace: 0pt; width: 100%;" width="100%">
<tr>
<td class="content-block" style="font-family: Helvetica, sans-serif; vertical-align: top; color: #9a9ea6; font-size: 12px; text-align: center;" valign="top" align="center">
<span class="apple-link" style="color: #9a9ea6; font-size: 12px; text-align: center;">Bremer Schiffsmeldedienst, Kapt. P. Langbein e.K., Hafenkopf II / Überseetor 20, 28217 Bremen / Germany</span>
<br> Sie möchten keine Benachrichtigungen mehr erhalten? <a href="mailto:bsmd@bsmd.de" style="text-decoration: underline; color: #9a9ea6; font-size: 12px; text-align: center;">Hier abmelden</a>.
</td>
</tr>
<tr>
<td class="content-block powered-by" style="font-family: Helvetica, sans-serif; vertical-align: top; color: #9a9ea6; font-size: 12px; text-align: center;" valign="top" align="center">
Mail-Design by <a href="http://htmlemail.io" style="color: #9a9ea6; font-size:12px; text-align: center; text-decoration: none;">HTMLemail.io</a>
</td>
</tr>
</table>
</div>
<!-- END FOOTER -->
<!-- END CENTERED WHITE CONTAINER --></div>
</td>
<td style="font-family: Helvetica, sans-serif; font-size: 16px; vertical-align: top;" valign="top">&nbsp;</td>
</tr>
</table>
</body>
</html>

View File

@ -85,10 +85,11 @@ class ShipcallType(IntEnum):
@dataclass
class History:
def __init__(self, id, participant_id, shipcall_id, timestamp, eta, type, operation):
def __init__(self, id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation):
self.id = id
self.participant_id = participant_id
self.shipcall_id = shipcall_id
self.user_id = user_id
self.timestamp = timestamp
self.eta = eta
self.type = type
@ -98,6 +99,7 @@ class History:
id: int
participant_id: int
shipcall_id: int
user_id: int
timestamp: datetime
eta: datetime
type: ObjectType
@ -108,6 +110,7 @@ class History:
"id": self.id,
"participant_id": self.participant_id,
"shipcall_id": self.shipcall_id,
"user_id": self.user_id,
"timestamp": self.timestamp.isoformat() if self.timestamp else "",
"eta": self.eta.isoformat() if self.eta else "",
"type": self.type.name,
@ -115,8 +118,8 @@ class History:
}
@classmethod
def from_query_row(self, id, participant_id, shipcall_id, timestamp, eta, type, operation):
return self(id, participant_id, shipcall_id, timestamp, eta, ObjectType(type), OperationType(operation))
def from_query_row(self, id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation):
return self(id, participant_id, shipcall_id, user_id, timestamp, eta, ObjectType(type), OperationType(operation))
class Error(Schema):
message = fields.String(metadata={'required':True})
@ -135,7 +138,7 @@ class Notification:
"""
id: int
shipcall_id: int # 'shipcall record that caused the notification'
level: int # 'severity of the notification'
level: int # 'severity of the notification'. #TODO_determine: Should this be identical to EvaluationType?
type: NotificationType # 'type of the notification'
message: str # 'individual message'
created: datetime
@ -503,10 +506,10 @@ class User:
user_phone: str
password_hash: str
api_key: str
notify_email: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_whatsapp: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_signal: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_popup: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_email: bool
notify_whatsapp: bool
notify_signal: bool
notify_popup: bool
created: datetime
modified: datetime

View File

@ -1,6 +1,8 @@
import os
import typing
import datetime
import smtplib
from socket import gaierror
from getpass import getpass
from email.message import EmailMessage
import mimetypes
@ -12,6 +14,12 @@ import email
from email.mime.multipart import MIMEMultipart
from email.mime.application import MIMEApplication
import subprocess
import sys
import time
from tempfile import NamedTemporaryFile
import json
from cryptography.fernet import Fernet
class EmailHandler():
"""
@ -36,7 +44,10 @@ class EmailHandler():
self.mail_port = mail_port
self.mail_address = mail_address
self.server = smtplib.SMTP_SSL(self.mail_server, self.mail_port) # alternatively, SMTP
try:
self.server = smtplib.SMTP_SSL(self.mail_server, self.mail_port) # alternatively, use smtplib.SMTP
except gaierror:
raise Exception(f"'socket.gaierror' raised. This commonly happens, when there is no access to the server (e.g., by not having an internet connection)")
def check_state(self):
"""check, whether the server login took place and is open."""
@ -59,7 +70,7 @@ class EmailHandler():
user = self.server.__dict__.get("user",None)
return user is not None
def login(self, interactive:bool=True):
def login(self, interactive:bool=True, pwd=typing.Optional[bytes]):
"""
login on the determined mail server's mail address. By default, this function opens an interactive window to
type the password without echoing (printing '*******' instead of readable characters).
@ -71,19 +82,30 @@ class EmailHandler():
(status_code, status_msg) = self.server.login(self.mail_address, password=getpass())
else:
# fernet + password file
raise NotImplementedError()
assert pwd is not None, f"when non-interactive login is selected, one must provide a password"
assert isinstance(pwd, bytes), "please provide only byte-encrypted secure passwords. Those should be Fernet encoded."
fernet_key_path = os.path.join(os.path.expanduser("~"), "secure", "email_login_fernet_key.json")
assert os.path.exists(fernet_key_path), f"cannot find fernet key file at path: {fernet_key_path}"
with open(fernet_key_path, "r") as jr:
json_content = json.load(jr)
assert "fernet_key" in json_content
key = json_content.get("fernet_key").encode("utf8")
(status_code, status_msg) = self.server.login(self.mail_address, password=Fernet(key).decrypt(pwd).decode())
return (status_code, status_msg) # should be: (235, b'2.7.0 Authentication successful')
def create_email(self, subject:str, message_body:str)->EmailMessage:
def create_email(self, subject:str, message_body:str, subtype:typing.Optional[str]=None, sender_address:typing.Optional[str]=None)->EmailMessage:
"""
Create an EmailMessage object, which contains the Email's header ("Subject"), content ("Message Body") and the sender's address ("From").
The EmailMessage object does not contain the recipients yet, as these will be defined upon sending the Email.
"""
msg = EmailMessage()
msg["Subject"] = subject
msg["From"] = self.mail_address
msg["From"] = self.mail_address if sender_address is None else sender_address
#msg["To"] = email_tgts # will be defined in self.send_email
msg.set_content(message_body)
msg.set_content(message_body, subtype=subtype) if subtype is not None else msg.set_content(message_body, subtype=subtype)
return msg
def build_recipients(self, email_tgts:list[str]):
@ -172,3 +194,155 @@ class EmailHandler():
self.server.quit()
return
def preview_html_content(self, html:str, delete_after_s_seconds:typing.Optional[float]=None, file_path_dict:dict={}):
"""
Given an HTML-formatted text string, this method creates a temporary .html file and
spawns the local default webbrowser to preview the content.
This method is useful to design or debug HTML files before sending them via the EmailHandler.
When providing a floating point to the 'delete_after_s_seconds' argument, the temporary file will be
automatically removed after those seconds. The python script is blocked for the duration (using time.sleep)
args:
file_path_dict:
it is common to refer to images via 'cid:FILE_ID' within the HTML content. The preview cannot
display this, as the attached files are missing. To circumvent this, one can provide a dictionary, which
replaced the referred key
(e.g., 'cid:FILE_ID')
with the actual path, such as a logo or remote absolute path
(e.g., 'file:///C:/Users/User/brecal/misc/logo_bremen_calling.png')
Inspired by: https://stackoverflow.com/questions/53452322/is-there-a-way-that-i-can-preview-my-html-file
User: https://stackoverflow.com/users/355230/martineau
"""
for k, v in file_path_dict.items():
html = html.replace(k, v)
with NamedTemporaryFile(mode='wt', suffix='.html', delete=False, encoding="utf-8") as temp_file:
temp_file.write(html)
temp_filename = temp_file.name # Save temp file's name.
command = f"{sys.executable} -m webbrowser -n {temp_filename}"
browser = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if delete_after_s_seconds is not None:
assert isinstance(delete_after_s_seconds, float)
time.sleep(delete_after_s_seconds)
if os.path.exists(temp_filename):
os.remove(temp_filename)
return
import typing
from email.mime.application import MIMEApplication
import mimetypes
import os
def find_warning_notification_email_template()->str:
"""
dynamically finds the 'default_email_template.txt' file within the module.
"""
# __file__ is BreCal/stubs/email_template.py
# parent of email_template.py is stubs
# parent of stubs is BreCal
brecal_root_folder = os.path.dirname(os.path.dirname(__file__)) # .../BreCal
resource_root_folder = os.path.join(brecal_root_folder, "resources") # .../BreCal/resources
html_filepath = os.path.join(resource_root_folder,"warning_notification_email_template.txt") # .../BreCal/resources/warning_notification_email_template.txt
assert os.path.exists(html_filepath), f"could not find default email template file at path: {html_filepath}"
return html_filepath
def get_default_html_email()->str:
"""
dynamically finds the 'default_email_template.txt' file within the module. It opens the file and returns the content.
__file__ returns to the file, where this function is stored (e.g., within BreCal.stubs.email_template)
using the dirname refers to the directory, where __file__ is stored.
finally, the 'default_email_template.txt' is stored within that folder
"""
html_filepath = find_warning_notification_email_template()
with open(html_filepath,"r", encoding="utf-8") as file: # encoding = "utf-8" allows for German Umlaute
content = file.read()
return content
def find_bremen_calling_logo():
"""
find the path towards the logo file (located at 'brecal\src\BreCalClient\Resources\logo_bremen_calling.png')
"""
# __file__ is services/email_handling.py
# parent of __file__ is services
# parent of services is BreCal
src_root_folder = os.path.dirname(os.path.dirname(__file__)) # .../BreCal
resource_root_folder = os.path.join(src_root_folder, "resources")
path = os.path.join(resource_root_folder, "logo_bremen_calling.png")
assert os.path.exists(path), f"cannot find logo of bremen calling at path: {os.path.abspath(path)}"
return path
def add_bremen_calling_logo(msg_multipart):
"""
The image is not attached automatically when it is embedded to the content. To circumvent this,
one commonly creates attachments, which are referred to in the email content.
The content body refers to 'LogoBremenCalling', which the 'Content-ID' of the logo is assigned as.
"""
path = find_bremen_calling_logo()
with open(path, 'rb') as file:
attachment = MIMEApplication(file.read(), _subtype=mimetypes.MimeTypes().guess_type(path), Name="bremen_calling.png")
attachment.add_header('Content-Disposition','attachment',filename=str(os.path.basename(path)))
attachment.add_header('Content-ID', '<LogoBremenCalling>')
msg_multipart.attach(attachment)
return msg_multipart
def create_shipcall_evaluation_notification(email_handler, ship_name:str, evaluation_message:str, eta_etd_str:str, eta_etd_type:str, content:str, files:typing.Optional[list[str]]):
"""
email_handler : EmailHandler. Contains meta-level information about the mail server and sender's Email.
ship_name : str. Name of the referenced ship, so the user knows the context.
evaluation_message : str. Brief description of the current evaluation state
eta_etd_str : str. Readable format of a datetime.datetime object, which is either ETA, ETD or both. Informs the user about when the shipcall is due.
eta_etd_type : str. Reference to the time, whether it arrives/leaves/shifts.
content : str (or filepath). Should refer to the template, which defines the content. This file contains HTML-structured text.
files: (optional). List of file paths, which are included as attachments.
"""
subject = f"{ship_name} (vorauss. {eta_etd_type}: {eta_etd_str})"
# create message_body
message_body = content # "Hello World."
evaluation_message_reformatted = evaluation_message.replace("\n", "<br>")
adaptive_content = f'<br>Betrifft: {ship_name} ({eta_etd_str})<font size="1"><br>{evaluation_message_reformatted}</font>'
message_body = message_body.replace("#ADAPTIVECONTENT", adaptive_content)
msg = email_handler.create_email(subject=subject, message_body=message_body, subtype="html")
msg_multipart = email_handler.translate_mail_to_multipart(msg=msg)
if files is not None:
for path in files:
assert os.path.exists(path), f"cannot find attachment at path: {path}"
email_handler.attach_file(path, msg=msg_multipart)
# add the bremen calling logo, which is referred to in the email body
msg_multipart = add_bremen_calling_logo(msg_multipart)
return (msg_multipart,content)
def send_notification(email_handler, email_tgts, msg, pwd, debug=False):
already_logged_in = email_handler.check_login()
if not already_logged_in:
email_handler.login(interactive=False, pwd=pwd)
try:
assert email_handler.check_login()
if not debug:
email_handler.send_email(msg, email_tgts)
else:
print(f"(send_notification INFO): debugging state. Would have sent an Email to: {email_tgts}")
finally:
if not already_logged_in:
email_handler.close()
return

View File

@ -4,6 +4,7 @@ from BreCal.schemas import model
from BreCal.local_db import getPoolConnection
from BreCal.database.update_database import evaluate_shipcall_state
from BreCal.database.sql_queries import create_sql_query_shipcall_get
from BreCal.notifications.notifier import Notifier
import threading
import schedule
@ -50,8 +51,8 @@ def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:
schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_)
return
def add_function_to_schedule__send_notifications(vr, interval_in_minutes:int=10):
schedule.every(interval_in_minutes).minutes.do(vr.notifier.send_notifications)
def add_function_to_schedule__send_notifications(interval_in_minutes:int=15):
schedule.every(interval_in_minutes).minutes.do(Notifier.send_notifications)
return
@ -64,8 +65,8 @@ def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
# update the evaluation state in every recent shipcall
add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes)
# placeholder: create/send notifications
# add_function_to_schedule__send_notifications(...)
# create/send notifications
add_function_to_schedule__send_notifications(15)
return

View File

@ -1,21 +1,21 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Notification
from BreCal.schemas.model import Notification, NotificationType
def get_notification_simple():
"""creates a default notification, where 'created' is now, and modified is now+10 seconds"""
notification_id = generate_uuid1_int() # uid?
times_id = generate_uuid1_int() # uid?
level = 10
type = 0
shipcall_id = 85
level = 2
type = NotificationType.email
message = "hello world"
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
notification = Notification(
notification_id,
times_id,
shipcall_id,
level,
type,
message,

View File

@ -76,8 +76,17 @@ class TimeLogic():
minute_delta = delta / np.timedelta64(1, unit)
return minute_delta
def time_delta_from_now_to_tgt(self, tgt_time, unit="m"):
return self.time_delta(datetime.datetime.now(), tgt_time=tgt_time, unit=unit)
def time_delta_from_now_to_tgt(self, tgt_time, unit="m", now_time=None):
"""
This method computes the timedelta between a target time {tgt_time} and the current timestamp. For the purpose of
reproducibility and testing, the current timestamp {now_time} can be overwritten. The default behaviour uses the
datetime.now() function.
"""
if now_time is None:
return self.time_delta(datetime.datetime.now(), tgt_time=tgt_time, unit=unit)
else:
assert isinstance(now_time,datetime.datetime), f"incorrect type for now_time: {now_time} with type {type(now_time)}"
return self.time_delta(now_time, tgt_time=tgt_time, unit=unit)
def time_inbetween(self, query_time:datetime.datetime, start_time:datetime.datetime, end_time:datetime.datetime) -> bool:
"""

View File

@ -929,6 +929,9 @@ class ValidationRuleFunctions(ValidationRuleBaseFunctions):
query_time = times_agency.iloc[0].eta_berth
# count the number of times, where a times entry is very close to the query time (uses an internal threshold, such as 15 minutes)
if all_times_agency is None:
all_times_agency = self.sql_handler.get_times_for_agency(non_null_column="eta_berth")
counts = self.sql_handler.count_synchronous_shipcall_times(query_time, all_df_times=all_times_agency)
violation_state = counts > maximum_threshold
@ -952,6 +955,9 @@ class ValidationRuleFunctions(ValidationRuleBaseFunctions):
times_agency = df_times.loc[df_times["participant_type"]==ParticipantType.AGENCY.value]
query_time = times_agency.iloc[0].etd_berth
if all_times_agency is None:
all_times_agency = self.sql_handler.get_times_for_agency(non_null_column="etd_berth")
# count the number of times, where a times entry is very close to the query time (uses an internal threshold, such as 15 minutes)
counts = self.sql_handler.count_synchronous_shipcall_times(query_time, all_df_times=all_times_agency)
violation_state = counts > maximum_threshold

View File

@ -1,3 +1,4 @@
import typing
import copy
import logging
import re
@ -74,6 +75,9 @@ class ValidationRules(ValidationRuleFunctions):
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
evaluation_notifications_sent_old = [ens for ens in shipcall_df.loc[:,"evaluation_notifications_sent"]]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# unbundle individual results. evaluation_states becomes an integer, violation
@ -82,6 +86,15 @@ class ValidationRules(ValidationRuleFunctions):
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
# build the list of evaluation times ('now', as isoformat)
evaluation_time = self.get_notification_times(evaluation_states_new)
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new, evaluation_notifications_sent_old)
shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations
shipcall_df.loc[:,"evaluation_time"] = evaluation_time
shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
#evaluation_time = self.get_notification_times(evaluation_states_new)
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
@ -112,16 +125,24 @@ class ValidationRules(ValidationRuleFunctions):
"""this function should apply the ValidationRules to the respective .shipcall, in regards to .times"""
return (StatusFlags.GREEN, False) # (state:str, should_notify:bool)
def determine_notification_state(self, state_old, state_new):
def determine_notification_state(self, state_old, state_new, evaluation_notifications_sent)->typing.Optional[bool]:
"""
this method determines state changes in the notification state. When the state increases, a user is notified about it.
state order: (NONE = GREEN < YELLOW < RED)
If a notification shall be sent, this method returns False. If no notification shall be sent, this method returns None or the prior state.
The method *never* returns True, as it shall only be called on novel shipcalls.
args:
evaluation_notifications_sent: the PREVIOUS state (if any) of this boolean. When no notification is required, the prior bool is used (e.g., None, False, True).
"""
previous_state = evaluation_notifications_sent
# identify a state increase
should_notify = self.identify_notification_state_change(state_old=state_old, state_new=state_new)
# when a state increases, a notification must be sent. Thereby, the field should be set to False ({evaluation_notifications_sent})
evaluation_notifications_sent = False if bool(should_notify) else None
evaluation_notifications_sent = False if bool(should_notify) else previous_state
return evaluation_notifications_sent
def identify_notification_state_change(self, state_old, state_new) -> bool:
@ -147,13 +168,13 @@ class ValidationRules(ValidationRuleFunctions):
return int(state_new) > int(state_old)
def get_notification_times(self, evaluation_states_new)->list[datetime.datetime]:
"""# build the list of evaluation times ('now', as isoformat)"""
evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))]
"""# build the list of evaluation times ('now'-datetime)"""
evaluation_times = [datetime.datetime.now() for _i in range(len(evaluation_states_new))] # .isoformat()
return evaluation_times
def get_notification_states(self, evaluation_states_old, evaluation_states_new)->list[bool]:
"""# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created"""
evaluation_notifications_sent = [self.determine_notification_state(state_old=int(state_old), state_new=int(state_new)) for state_old, state_new in zip(evaluation_states_old, evaluation_states_new)]
def get_notification_states(self, evaluation_states_old, evaluation_states_new, evaluation_notifications_sent_old)->list[typing.Optional[bool]]:
"""# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created and None, when not"""
evaluation_notifications_sent = [self.determine_notification_state(state_old=int(state_old), state_new=int(state_new), evaluation_notifications_sent=ens) for state_old, state_new, ens in zip(evaluation_states_old, evaluation_states_new, evaluation_notifications_sent_old)]
return evaluation_notifications_sent

View File

@ -0,0 +1,21 @@
import pytest
from BreCal.notifications.accounts import mail_server, mail_port, mail_address, mail_pwd
def test_mail_server():
assert isinstance(mail_server, str)
assert not "@" in mail_server
return
def test_mail_port():
assert isinstance(mail_port, int)
return
def test_mail_address():
assert isinstance(mail_address, str)
assert "@" in mail_address
return
def test_mail_pwd():
assert isinstance(mail_pwd, bytes), f"must be a bytes-encoded password to protect the account"
return

View File

View File

@ -0,0 +1,14 @@
import pytest
import os
def test_find_bremen_calling_logo():
from BreCal.services.email_handling import find_bremen_calling_logo
path = find_bremen_calling_logo()
assert os.path.exists(path), f"cannot find the bremen calling logo file, which is needed for notifications (e.g., Email). Searched at path: \n\t{path}"
return
def test_find_warning_notification_email_template():
from BreCal.services.email_handling import find_warning_notification_email_template
path = find_warning_notification_email_template()
assert os.path.exists(path), f"cannot find the required email template, which is needed for warning notifications. Searched at path: \n\t{path}"
return