implementing notifications, working on input validation. rebase.

This commit is contained in:
scopesorting 2024-01-19 14:22:54 +01:00 committed by Max Metz
parent 1cc47d1eaf
commit 863d265669
7 changed files with 182 additions and 58 deletions

View File

@ -26,6 +26,8 @@ class ParticipantwiseTimeDelta():
TUG = 960.0 # 16 h * 60 min/h = 960 min
TERMINAL = 960.0 # 16 h * 60 min/h = 960 min
NOTIFICATION = 10.0 # after n minutes, an evaluation may rise a notification
class StatusFlags(Enum):
"""
these enumerators ensure that each traffic light validation rule state corresponds to a value, which will be used in the ValidationRules object to identify
@ -39,3 +41,10 @@ class StatusFlags(Enum):
class PierSide(IntEnum):
PORTSIDE = 0 # Port/Backbord
STARBOARD_SIDE = 1 # Starboard / Steuerbord
class NotificationType(IntFlag):
"""determines the method by which a notification is distributed to users. Flagging allows selecting multiple notification types."""
UNDEFINED = 0
EMAIL = 1
POPUP = 2
MESSENGER = 4

View File

@ -0,0 +1,115 @@
import datetime
import pandas as pd
from BreCal.schemas.model import Notification
from BreCal.database.enums import NotificationType, ParticipantType, ShipcallType, StatusFlags
def create_notification(id, times_id, message, level, notification_type:NotificationType, created=None, modified=None):
created = (datetime.datetime.now()).isoformat() or created
notification = Notification(
id=id,
times_id=times_id, acknowledged=False, level=level, type=notification_type.value, message=message, created=created, modified=modified
)
return notification
#### Verbosity Functions ####
def get_default_header()->str:
# HEADER (greeting and default message)
header = "Dear Sir or Madam\n\nThank you for participating in the project 'Bremen Calling'. During analysis, our software has identified an event, which may be worth a second look. Here is the summary. \n\n"
return header
def get_default_footer()->str:
# FOOTER (signature)
footer = "\n\nWe would kindly ask you to have a look at the shipcall and verify, if any action is required from your side. \n\nKind regards\nThe 'Bremen Calling' Team"
return footer
def get_agency_name(sql_handler, times_df):
times_agency = times_df.loc[times_df["participant_type"]==ParticipantType.AGENCY.value,"participant_id"]
if len(times_agency)==0:
agency_name = ""
else:
agency_participant_id = times_agency.iloc[0]
agency_name = sql_handler.df_dict.get("participant").loc[agency_participant_id,"name"]
return agency_name
def get_ship_name(sql_handler, shipcall):
ship = sql_handler.df_dict.get("ship").loc[shipcall.ship_id]
ship_name = ship.loc["name"] # when calling ship.name, the ID is returned (pandas syntax)
return ship_name
def create_notification_body(sql_handler, times_df, shipcall, result)->str:
# #TODO: add 'Link zum Anlauf'
# URL: https://trello.com/c/qenZyJxR/75-als-bsmd-m%C3%B6chte-ich-%C3%BCber-gelbe-und-rote-ampeln-informiert-werden-um-die-systembeteiligung-zu-st%C3%A4rken
header = get_default_header()
footer = get_default_footer()
agency_name = get_agency_name(sql_handler, times_df)
ship_name = get_ship_name(sql_handler, shipcall)
verbosity_introduction = f"Respective Shipcall:\n"
traffic_state_verbosity = f"\tTraffic Light State: {StatusFlags(result[0]).name}\n"
ship_name_verbosity = f"\tShip: {ship_name} (the ship is {ShipcallType(shipcall.type).name.lower()})\n"
agency_name_verbosity = f"\tResponsible Agency: {agency_name}\n"
eta_verbosity = f"\tEstimated Arrival Time: {shipcall.eta.isoformat()}\n" if not pd.isna(shipcall.eta) else ""
etd_verbosity = f"\tEstimated Departure Time: {shipcall.etd.isoformat()}\n" if not pd.isna(shipcall.etd) else ""
error_verbosity = f"\nError Description:\n\t" + "\n\t".join(result[1])
message_body = "".join([header, verbosity_introduction, traffic_state_verbosity, ship_name_verbosity, agency_name_verbosity, eta_verbosity, etd_verbosity, error_verbosity, footer])
return message_body
class Notifier():
"""An object that helps with the logic of selecting eligible shipcalls to create the correct notifications for the respective users."""
def __init__(self)->None:
pass
def determine_notification_state(self, state_old, state_new):
"""
this method determines state changes in the notification state. When the state increases, a user is notified about it.
state order: (NONE = GREEN < YELLOW < RED)
"""
# identify a state increase
should_notify = self.identify_notification_state_change(state_old=state_old, state_new=state_new)
# when a state increases, a notification must be sent. Thereby, the field should be set to False ({evaluation_notifications_sent})
evaluation_notifications_sent = False if bool(should_notify) else None
return evaluation_notifications_sent
def identify_notification_state_change(self, state_old, state_new) -> bool:
"""
determines, whether the observed state change should trigger a notification.
internally, this function maps StatusFlags to an integer and determines, if the successor state is more severe than the predecessor.
state changes trigger a notification in the following cases:
green -> yellow
green -> red
yellow -> red
(none -> yellow) or (none -> red)
due to the values in the enumeration objects, the states are mapped to provide this function.
green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
returns bool, whether a notification should be triggered
"""
# state_old is always considered at least 'Green' (1)
if state_old is None:
state_old = StatusFlags.NONE.value
state_old = max(int(state_old), StatusFlags.GREEN.value)
return int(state_new) > int(state_old)
def get_notification_times(self, evaluation_states_new)->list[datetime.datetime]:
"""# build the list of evaluation times ('now', as isoformat)"""
evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))]
return evaluation_times
def get_notification_states(self, evaluation_states_old, evaluation_states_new)->list[bool]:
"""# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created"""
evaluation_notifications_sent = [self.notifier.determine_notification_state(state_old=int(state_old), state_new=int(state_new)) for state_old, state_new in zip(evaluation_states_old, evaluation_states_new)]
return evaluation_notifications_sent

View File

@ -184,11 +184,18 @@ class ShipcallSchema(Schema):
anchored = fields.Bool(Required = False, allow_none=True)
moored_lock = fields.Bool(Required = False, allow_none=True)
canceled = fields.Bool(Required = False, allow_none=True)
<<<<<<< HEAD
evaluation = fields.Enum(EvaluationType, required=False, allow_none=True, default=EvaluationType.undefined)
evaluation_message = fields.Str(allow_none=True, metadata={'Required':False}) # Solving: RemovedInMarshmallow4Warning: Passing field metadata as keyword arguments is deprecated. Use the explicit `metadata=...` argument instead. Additional metadata: {'Required': False}
evaluation_time = fields.DateTime(Required = False, allow_none=True)
evaluation_notifications_sent = fields.Bool(Required = False, allow_none=True)
time_ref_point = fields.Int(Required = False, allow_none=True)
=======
evaluation = fields.Integer(Required = False, allow_none=True)
evaluation_message = fields.String(allow_none=True, metadata={'Required':False}) # Solving: RemovedInMarshmallow4Warning: Passing field metadata as keyword arguments is deprecated. Use the explicit `metadata=...` argument instead. Additional metadata: {'Required': False}
evaluation_time = fields.DateTime(Required = False, allow_none=True)
evaluation_notifications_sent = fields.Bool(Required = False, allow_none=True)
>>>>>>> a5284a4 (implementing notifications, working on input validation)
participants = fields.List(fields.Nested(ParticipantAssignmentSchema))
created = fields.DateTime(Required = False, allow_none=True)
modified = fields.DateTime(Required = False, allow_none=True)
@ -251,6 +258,8 @@ class Shipcall:
created: datetime
modified: datetime
participants: List[Participant_Assignment] = field(default_factory=list)
evaluation_time : datetime = None
evaluation_notifications_sent : bool = None
def to_json(self):
return {

View File

@ -57,6 +57,11 @@ def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:
schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_)
return
def add_function_to_schedule__send_notifications(vr, interval_in_minutes:int=10):
schedule.every(interval_in_minutes).minutes.do(vr.notifier.send_notifications)
return
def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
logging.getLogger('schedule').setLevel(logging.INFO); # set the logging level of the schedule module to INFO

View File

@ -37,12 +37,12 @@ def get_shipcall_simple():
recommended_tugs = 2 # assert 0<recommended_tugs<={threshold}. E.g., 20 should not be exceeded.
anchored = False
moored_lock = False # de: 'Festmacherschleuse', en: 'moored lock'
canceled = False
evaluation = None
evaluation_message = ""
evaluation_time = None
evaluation_notifications_sent = False
evaluation_notifications_sent = None
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)

View File

@ -3,9 +3,11 @@ import logging
import re
import numpy as np
import pandas as pd
import datetime
from BreCal.database.enums import StatusFlags
from BreCal.validators.validation_rule_functions import ValidationRuleFunctions
from BreCal.schemas.model import Shipcall
from BreCal.notifications.notification_functions import Notifier
class ValidationRules(ValidationRuleFunctions):
@ -17,10 +19,7 @@ class ValidationRules(ValidationRuleFunctions):
"""
def __init__(self, sql_handler): # use the entire data that is provided for this query (e.g., json input)
super().__init__(sql_handler)
self.validation_state = self.determine_validation_state()
# currently flagged: notification_state initially was based on using one ValidationRules object for each query. This is deprecated.
# self.notification_state = self.determine_notification_state() # (state:str, should_notify:bool)
self.notifier = Notifier()
return
def evaluate(self, shipcall):
@ -74,16 +73,25 @@ class ValidationRules(ValidationRuleFunctions):
return evaluation_state, violations
def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame:
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation' and 'evaluation_message' are updated)"""
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# unbundle individual results. evaluation_state becomes an integer, violation
evaluation_state = [StatusFlags(res[0]).value for res in results]
# unbundle individual results. evaluation_states becomes an integer, violation
evaluation_states_new = [StatusFlags(res[0]).value for res in results]
violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results]
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
shipcall_df.loc[:,"evaluation"] = evaluation_state
# build the list of evaluation times ('now', as isoformat)
evaluation_times = self.notifier.get_notification_times(evaluation_states_new)
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new)
shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations
shipcall_df.loc[:,"evaluation_times"] = evaluation_times
shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
return shipcall_df
def concise_evaluation_message_if_too_long(self, violation):
@ -100,53 +108,31 @@ class ValidationRules(ValidationRuleFunctions):
# e.g.: Evaluation message too long. Violated Rules: ['Rule #0001C', 'Rule #0001H', 'Rule #0001F', 'Rule #0001G', 'Rule #0001L', 'Rule #0001M', 'Rule #0001J', 'Rule #0001K']
violation = f"Evaluation message too long. Violated Rules: {concise}"
return violation
def determine_validation_state(self) -> str:
"""
this method determines the validation state of a shipcall. The state is either ['green', 'yellow', 'red'] and signals,
whether an entry causes issues within the workflow of users.
returns: validation_state_new (str)
"""
(validation_state_new, description) = self.undefined_method()
# should there also be notifications for critical validation states? In principle, the traffic light itself provides that notification.
self.validation_state = validation_state_new
return validation_state_new
def determine_notification_state(self) -> (str, bool):
"""
this method determines state changes in the notification state. When the state is changed to yellow or red,
a user is notified about it. The only exception for this rule is when the state was yellow or red before,
as the user has then already been notified.
returns: notification_state_new (str), should_notify (bool)
"""
(state_new, description) = self.undefined_method() # determine the successor
should_notify = self.identify_notification_state_change(state_new)
self.notification_state = state_new # overwrite the predecessor
return state_new, should_notify
def identify_notification_state_change(self, state_new) -> bool:
"""
determines, whether the observed state change should trigger a notification.
internally, this function maps a color string to an integer and determines, if the successor state is more severe than the predecessor.
state changes trigger a notification in the following cases:
green -> yellow
green -> red
yellow -> red
(none -> yellow) or (none -> red)
due to the values in the enumeration objects, the states are mapped to provide this function.
green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
returns bool, whether a notification should be triggered
"""
# state_old is always considered at least 'Green' (1)
state_old = max(copy.copy(self.notification_state) if "notification_state" in list(self.__dict__.keys()) else StatusFlags.NONE, StatusFlags.GREEN.value)
return state_new.value > state_old.value
def undefined_method(self) -> str:
"""this function should apply the ValidationRules to the respective .shipcall, in regards to .times"""
# #TODO_traffic_state
return (StatusFlags.GREEN, False) # (state:str, should_notify:bool)
def inspect_shipcall_evaluation(vr, sql_handler, shipcall_id):
"""
# debug only!
a simple debugging function, which serves in inspecting an evaluation function for a single shipcall id. It returns the result and all related data.
returns: result, shipcall_df (filtered by shipcall id), shipcall, spm (shipcall participant map, filtered by shipcall id), times_df (filtered by shipcall id)
"""
shipcall_df = sql_handler.df_dict.get("shipcall").loc[shipcall_id:shipcall_id,:]
shipcall = Shipcall(**{**{"id":shipcall_id},**sql_handler.df_dict.get("shipcall").loc[shipcall_id].to_dict()})
result = vr.evaluate(shipcall=shipcall)
notification_state = vr.identify_notification_state_change(state_old=int(shipcall.evaluation), state_new=int(result[0]))
print(f"Previous state: {int(shipcall.evaluation)}, New State: {result[0]}, Notification State: {notification_state}")
times_df = sql_handler.df_dict.get("times")
times_df = times_df.loc[times_df["shipcall_id"]==shipcall_id]
spm = sql_handler.df_dict["shipcall_participant_map"]
spm = spm.loc[spm["shipcall_id"]==shipcall_id]
return result, shipcall_df, shipcall, spm, times_df