Created Notifier object, which provides the logic to create notifications and issue them to the respective users, depending on the type of notification they have selected. Building the skeleton, where the methods will be filled functionally soon. Adapted the History-schema.

This commit is contained in:
Max Metz 2024-06-10 08:09:51 +02:00
parent be6c898415
commit 36b6173b36
10 changed files with 401 additions and 14 deletions

View File

@ -11,6 +11,10 @@ bp = Blueprint('user', __name__)
@bp.route('/user', methods=['put'])
@auth_guard() # no restriction by role
def PutUser():
# #TODO: user validation should be extended by the notifications. When someone wants to set
# notify_email = 1, the email must be either present or part of the loadedModel
# notify_whatsapp = 1, there must be a phone number (same for notify_signal)
# notify_push = 1, there must be a phone number (#TODO_determine ... or an app-id? Unclear still)
try:
content = request.get_json(force=True)

View File

@ -213,7 +213,7 @@ class SQLQuery():
@staticmethod
def get_history()->str:
query = "SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?"
query = "SELECT id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?"
return query
@staticmethod
@ -412,14 +412,14 @@ class SQLQuery():
@staticmethod
def get_notification_post()->str:
raise NotImplementedError()
raise NotImplementedError("skeleton")
# #TODO: this query is wrong and just a proxy for a POST request
query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
return query
@staticmethod
def get_shipcall_put_notification_state()->str:
raise NotImplementedError()
raise NotImplementedError("skeleton")
# #TODO: use evaluation_notifications_sent here and consider only the shipcall_id
# #TODO: query
query = ...

View File

@ -23,7 +23,7 @@ def GetHistory(options):
if "shipcall_id" in options and options["shipcall_id"]:
# query = SQLQuery.get_history()
# data = commands.query(query, model=History.from_query_row, param={"shipcallid" : options["shipcall_id"]})
data = commands.query("SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?",
data = commands.query("SELECT id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?",
model=History.from_query_row,
param={"shipcallid" : options["shipcall_id"]})

View File

@ -0,0 +1,53 @@
import pandas as pd
from server.BreCal.database.enums import ParticipantType, ShipcallType, StatusFlags
#### Verbosity Functions ####
def get_default_header()->str:
# HEADER (greeting and default message)
header = "Dear Sir or Madam\n\nThank you for participating in the project 'Bremen Calling'. During analysis, our software has identified an event, which may be worth a second look. Here is the summary. \n\n"
return header
def get_default_footer()->str:
# FOOTER (signature)
footer = "\n\nWe would kindly ask you to have a look at the shipcall and verify, if any action is required from your side. \n\nKind regards\nThe 'Bremen Calling' Team"
return footer
def get_agency_name(sql_handler, times_df):
times_agency = times_df.loc[times_df["participant_type"]==ParticipantType.AGENCY.value,"participant_id"]
if len(times_agency)==0:
agency_name = ""
else:
agency_participant_id = times_agency.iloc[0]
agency_name = sql_handler.df_dict.get("participant").loc[agency_participant_id,"name"]
return agency_name
def get_ship_name(sql_handler, shipcall):
ship = sql_handler.df_dict.get("ship").loc[shipcall.ship_id]
ship_name = ship.loc["name"] # when calling ship.name, the ID is returned (pandas syntax)
return ship_name
def create_notification_body(sql_handler, times_df, shipcall, result)->str:
# #TODO: add 'Link zum Anlauf'
# URL: https://trello.com/c/qenZyJxR/75-als-bsmd-m%C3%B6chte-ich-%C3%BCber-gelbe-und-rote-ampeln-informiert-werden-um-die-systembeteiligung-zu-st%C3%A4rken
header = get_default_header()
footer = get_default_footer()
agency_name = get_agency_name(sql_handler, times_df)
ship_name = get_ship_name(sql_handler, shipcall)
verbosity_introduction = f"Respective Shipcall:\n"
traffic_state_verbosity = f"\tTraffic Light State: {StatusFlags(result[0]).name}\n"
ship_name_verbosity = f"\tShip: {ship_name} (the ship is {ShipcallType(shipcall.type).name.lower()})\n"
agency_name_verbosity = f"\tResponsible Agency: {agency_name}\n"
eta_verbosity = f"\tEstimated Arrival Time: {shipcall.eta.isoformat()}\n" if not pd.isna(shipcall.eta) else ""
etd_verbosity = f"\tEstimated Departure Time: {shipcall.etd.isoformat()}\n" if not pd.isna(shipcall.etd) else ""
error_verbosity = f"\nError Description:\n\t" + "\n\t".join(result[1])
message_body = "".join([header, verbosity_introduction, traffic_state_verbosity, ship_name_verbosity, agency_name_verbosity, eta_verbosity, etd_verbosity, error_verbosity, footer])
return message_body

View File

@ -0,0 +1,13 @@
import datetime
from BreCal.schemas.model import Notification
from BreCal.database.enums import NotificationType, StatusFlags
def create_notification(id, times_id, message, level, notification_type:NotificationType, created=None, modified=None):
# #TODO_determine: determine, whether this function is still in active use. The data-model seems outdated.
created = (datetime.datetime.now()).isoformat() or created
notification = Notification(
id=id,
times_id=times_id, acknowledged=False, level=level, type=notification_type.value, message=message, created=created, modified=modified
)
return notification

View File

@ -0,0 +1,313 @@
import typing
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model
class Notifier():
"""
This class provides quick access to different notification functions.
Each method is callable without initializing the Notifier object.
Example:
Notifier.send_notifications(*args)
The Notifier has three main methods.
Notifier.send_notifications() --- can be called routinely. Identifies all candidates and notifies the users
Notifier.send_notification(shipcall) --- applies filters to identify, whether a notification is desired. If so, notifies the users
Notifer.create(user) --- 'naive' method, which simply creates a message and sends it to the user's preferred choice
# #TODO_determine: it makes sense to go one step finer. .create could produce messages and recipients, whereas .publish may then issue those and .document may update the SQL dataset
## naming idea: Notifier.send_notifications, Notifier.send_notification, Notifier.send (which may contain .create, .publish, .document)
"""
def __init__(self) -> None:
pass
@staticmethod
def send_notifications(is_test:bool=False) -> None:
"""
This method is used in BreCal.services.schedule_routines and will issue notifications, once they are due.
It is purposely defined in a way, where no external dependencies or arguments are required. The only exception is the
'is_test' boolean, as it prevents the notifications from being *actually* sent as part of the pytests.
Steps:
- get all shipcalls
- filter: consider only those, which are not yet sent (uses shipcall.evaluation_notifications_sent)
- iterate over each remaining shipcall and apply .send_notification
- those which are unsent, shall be sent by the respective type
"""
raise NotImplementedError("skeleton")
# get all shipcalls
all_shipcalls = NotImplementedError
shipcalls = [shipcall for shipcall in all_shipcalls if not shipcall.evaluation_notifications_sent]
for shipcall in shipcalls:
notification_list = Notifier.send_notification(shipcall, is_test=is_test)
# #TODO: get all notifications
# #TODO: get matching shipcall (based on shipcall_id)
# #TODO: filter: consider only those, which are not yet sent
# identify necessity
# #TODO: get the 'evaluation_notifications_sent' field from all shipcalls (based on shipcall_id)
# if not -> return
# USE shipcall.evaluation_notifications_sent
# #TODO: those which are unsent, shall be created&sent by the respective type -- Note: consider the is_test argument
# iterate over the list of Notifier.build_notification_type_list
# one might use Notifier.create(..., update_database=True)
# use the History (GetHistory -- by shipcall_id) to identify all subscribed users
# #TODO: update the shipcall dataset ('evaluation_notifications_sent') -- Note: consider the is_test argument
# #TODO_clarify: how to handle the 'evaluation_notifications_sent', when there is no recipient?
return
@staticmethod
def send_notification(shipcall:model.Shipcall, is_test:bool=False)->list[model.Notification]:
"""
Complex-function, which is responsible of creating notification messages, issuing them to users and optionally updating
the database. The requirement is, that the notification is required and passes through an internal set of filters.
Steps:
- get all notifications of shipcall_id
- identify the assigned list of users
- apply all filters. When a filter triggers, exit. If not, create and send a notification.
"""
update_database = False if is_test else True
# #TODO: the concept of old state and new state must be refactored.
# old state: read shipcall_id from notifications and look for the latest finding (if None -> EvaluationType.undefined)
# new state: read shipcall_id from shipcalls and look for the *current* 'evaluation' (-> EvaluationType(value))
# get existing notifications by shipcall_id (list)
existing_notifications = Notifier.get_existing_notifications(shipcall_id=shipcall.id)
old_state = NotImplementedError
new_state = shipcall.evaluation
# get User by querying all History objects of a shipcall_id
users = Notifier.get_users_via_history(shipcall_id=shipcall.id)
# identify necessity
# state-check: Did the 'evaluation' shift to a higher level of severity?
severity_bool = Notifier.check_higher_severity(old_state, new_state)
if not severity_bool:
return None
# #TODO: time-based filter. There shall be 'enough' time between the evaluation time and NOW
evaluation_time = shipcall.evaluation_time
# latency_bool = #TODO_DIFFERENCE_FROM_NOW_TO_EVALUATION_TIME____THIS_METHOD_ALREADY_EXISTS(evaluation_time)
# careful: what is True, what is False?
# if latency_booL:
# return None
notification_list = []
for user in users:
notification = Notifier.create(
shipcall.id,
old_state,
new_state,
user,
update_database=update_database,
is_test=is_test
)
notification_list.append(notification)
return notification_list
@staticmethod
def publish(shipcall_id, old_state, new_state, user, update_database:bool=False)->typing.Optional[model.Notification]:
"""
Complex-function, which creates, sends and documents a notification. It serves as a convenience function.
The method does not apply internal filters to identify, whether a notification should be created in the first place.
options:
update_database: bool.
# #TODO: instead of update_database, one may also use is_test
"""
# 1.) create
# ... = Notifier.create(shipcall_id, old_state, new_state, user) # e.g., might return a dictionary of dict[model.NotificationType, str], where str is the message
# 2.) send
# ... = Notifier.send(...) # should contain internal 'logistics', which user the respective handlers to send notifications
# 3.) document (mysql database)
# if update_database
# ... = Notifier.document(...)
raise NotImplementedError("skeleton")
return
@staticmethod
def create(shipcall_id, old_state, new_state, user, update_database:bool=False)->typing.Optional[model.Notification]:
"""
Standalone function, which creates a Notification for a specific user.
Steps:
- identify a list of notification_types, which shall be issued (based on the user's 'notify_*' settings)
- create messages based on the respective NotificationType, which the user has enabled
- send the messages
- update the shipcall dataset ('evaluation_notifications_sent')
args:
update_database: whether to update the MySQL database by posting the notification.
"""
assert user.id is not None
assert shipcall_id is not None
assert old_state is not None
assert new_state is not None
# get Shipcall by shipcall_id
shipcall = Notifier.get_shipcall(shipcall_id=shipcall_id)
"""
## TODO: this might simply be removed due to incorrect concept
## could also relocate this to the generation function, which identifies the notifications to be created
## should be unnecessary due to shipcall.evaluation_notifications_sent
# a) filter existing notifictions and consider only the dataset, where type (notification_type) and level (new_state) are suitable
notification_exists = Notifier.check_notification_type_and_level_exists(shipcall_id=shipcall_id, notification_type=notification_type, level=new_state, existing_notifications=existing_notifications)
if notification_exists:
return None
"""
# get a list of all subscribed notification types and track the state (success or failure)
successes = {}
notification_type_list = Notifier.build_notification_type_list(user)
for notification_type in notification_type_list:
# generate message based on the notification type
message = Notifier.generate_notification_message_by_type(notification_type, evaluation_message=shipcall.evaluation_message, user=user)
# send the message
success_state = Notifier.send_notification_by_type(notification_type, message)
successes[notification_type] = success_state
raise NotImplementedError("skeleton")
notification = ...
return notification
@staticmethod
def find_latest_notification(notifications:list[model.Notification])->typing.Optional[model.Notification]:
"""given a list of notification objects, this method returns the object, where the .created field corresponds to the *latest* notification object"""
latest_notification = sorted(notifications, key=lambda notification: notification.created, reverse=False)[-1] if len(notifications)>0 else None
return latest_notification
@staticmethod
def get_users_via_history(shipcall_id:int)->list[model.User]:
"""using the History objects, one can infer the user_id, which allows querying the Users"""
histories = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : shipcall_id}, model=model.History, command_type="query")
user_ids = [
history.user_id
for history in histories
]
users = [Notifier.get_user(user_id) for user_id in user_ids]
return users
@staticmethod
def get_user(user_id:int)->model.User:
"""Given a user_id, this method executes an SQL query to return a User"""
user = execute_sql_query_standalone(query=SQLQuery.get_user_by_id(), param={"id" : user_id}, model=model.User, command_type="single")
return user
@staticmethod
def get_shipcall(shipcall_id:int)->model.Shipcall:
"""Given a shipcall_id, this method executes an SQL query to return a Shipcall"""
shipcall = execute_sql_query_standalone(query=SQLQuery.get_shipcall_by_id(), param={"id" : shipcall_id}, model=model.Shipcall.from_query_row, command_type="single")
return shipcall
@staticmethod
def get_existing_notifications(shipcall_id:int)->list[model.Notification]:
existing_notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : shipcall_id}, model=model.Notification, command_type="query")
return existing_notifications
@staticmethod
def build_notification_type_list(user:model.User)->list[model.NotificationType]:
"""
based on a User, this method generates a list of notification types. These can be used as instructions to
generate the respective Notification datasets.
"""
notification_type_list = []
if user.notify_email:
notification_type_list.append(model.NotificationType.email)
if user.notify_popup:
notification_type_list.append(model.NotificationType.push)
if user.notify_whatsapp:
# currently not defined as a data model. Must be included / changed, once the data model of NotificationType is updated
notification_type_list.append(model.NotificationType.undefined)
if user.notify_signal:
# currently not defined as a data model. Must be included / changed, once the data model of NotificationType is updated
notification_type_list.append(model.NotificationType.undefined)
return notification_type_list
@staticmethod
def check_notification_type_and_level_exists(shipcall_id:int, notification_type:model.NotificationType, level:model.EvaluationType, existing_notifications:list[model.Notification])->bool:
"""This method checks, whether one of the Notification elements in the provided list is a perfect match to the arguments shipcall_id, notification_type and level"""
# #TODO_determine: should a notification be *skipped*, when there already is a dataset with
# identical level and type? ---> currently enabled.
# check, if any of the existing notifications is a perfect match for notification type & level & shipcall
matches = [note for note in existing_notifications if (int(level)==int(note.level)) and (int(note.type)==int(notification_type)) and (shipcall_id==note.shipcall_id)]
# bool: whether there is a perfect match
exists = len(matches)>0
raise Exception("deprecated")
return exists
@staticmethod
def check_higher_severity(old_state:model.EvaluationType, new_state:model.EvaluationType)->bool:
"""
determines, whether the observed state change should trigger a notification.
internally, this function maps StatusFlags to an integer and determines, if the successor state is more severe than the predecessor.
state changes trigger a notification in the following cases:
green -> yellow
green -> red
yellow -> red
(none -> yellow) or (none -> red)
due to the values in the enumeration objects, the states are mapped to provide this function.
green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
returns bool, whether a notification should be triggered
"""
# undefined previous state: .undefined (0)
if old_state is None:
old_state = model.EvaluationType.undefined
# old_state is always considered at least .green (1) (hence, .undefined becomes .green)
old_state = max(int(old_state), model.EvaluationType.green)
# the IntEnum values are correctly sequenced. .red > .yellow > .green > .undefined
# as .undefined becomes .green, an old_state is always *at least* green.
severity_grew = int(new_state) > int(old_state)
return severity_grew
@staticmethod
def generate_notification_message_by_type(notification_type:model.NotificationType, evaluation_message:str, user:model.User):
assert isinstance(user, model.User)
if int(notification_type) == int(model.NotificationType.undefined):
raise NotImplementedError("skeleton")
elif int(notification_type) == int(model.NotificationType.email):
raise NotImplementedError("skeleton")
elif int(notification_type) == int(model.NotificationType.push):
raise NotImplementedError("skeleton")
#elif int(notification_type) == int(model.NotificationType.whatsapp):
#raise NotImplementedError("skeleton")
#elif int(notification_type) == int(model.NotificationType.signal):
#raise NotImplementedError("skeleton")
elif int(notification_type) == int(model.NotificationType.undefined):
raise NotImplementedError("skeleton")
else:
raise ValueError(notification_type)
return
"""# build the list of evaluation times ('now', as isoformat)"""
#evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))]

View File

@ -85,10 +85,11 @@ class ShipcallType(IntEnum):
@dataclass
class History:
def __init__(self, id, participant_id, shipcall_id, timestamp, eta, type, operation):
def __init__(self, id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation):
self.id = id
self.participant_id = participant_id
self.shipcall_id = shipcall_id
self.user_id = user_id
self.timestamp = timestamp
self.eta = eta
self.type = type
@ -98,6 +99,7 @@ class History:
id: int
participant_id: int
shipcall_id: int
user_id: int
timestamp: datetime
eta: datetime
type: ObjectType
@ -108,6 +110,7 @@ class History:
"id": self.id,
"participant_id": self.participant_id,
"shipcall_id": self.shipcall_id,
"user_id": self.user_id,
"timestamp": self.timestamp.isoformat() if self.timestamp else "",
"eta": self.eta.isoformat() if self.eta else "",
"type": self.type.name,
@ -115,8 +118,8 @@ class History:
}
@classmethod
def from_query_row(self, id, participant_id, shipcall_id, timestamp, eta, type, operation):
return self(id, participant_id, shipcall_id, timestamp, eta, ObjectType(type), OperationType(operation))
def from_query_row(self, id, participant_id, shipcall_id, user_id, timestamp, eta, type, operation):
return self(id, participant_id, shipcall_id, user_id, timestamp, eta, ObjectType(type), OperationType(operation))
class Error(Schema):
message = fields.String(metadata={'required':True})
@ -135,7 +138,7 @@ class Notification:
"""
id: int
shipcall_id: int # 'shipcall record that caused the notification'
level: int # 'severity of the notification'
level: int # 'severity of the notification'. #TODO_determine: Should this be identical to EvaluationType?
type: NotificationType # 'type of the notification'
message: str # 'individual message'
created: datetime

View File

@ -4,6 +4,7 @@ from BreCal.schemas import model
from BreCal.local_db import getPoolConnection
from BreCal.database.update_database import evaluate_shipcall_state
from BreCal.database.sql_queries import create_sql_query_shipcall_get
from BreCal.notifications.notifier import Notifier
import threading
import schedule
@ -51,7 +52,7 @@ def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:
return
def add_function_to_schedule__send_notifications(vr, interval_in_minutes:int=10):
schedule.every(interval_in_minutes).minutes.do(vr.notifier.send_notifications)
schedule.every(interval_in_minutes).minutes.do(Notifier.send_notifications)
return

View File

@ -1,21 +1,21 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Notification
from BreCal.schemas.model import Notification, NotificationType
def get_notification_simple():
"""creates a default notification, where 'created' is now, and modified is now+10 seconds"""
notification_id = generate_uuid1_int() # uid?
times_id = generate_uuid1_int() # uid?
level = 10
type = 0
shipcall_id = 85
level = 2
type = NotificationType.email
message = "hello world"
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
notification = Notification(
notification_id,
times_id,
shipcall_id,
level,
type,
message,