diff --git a/src/server/BreCal/impl/shipcalls.py b/src/server/BreCal/impl/shipcalls.py index 0eb226b..0035c7b 100644 --- a/src/server/BreCal/impl/shipcalls.py +++ b/src/server/BreCal/impl/shipcalls.py @@ -136,7 +136,7 @@ def PostShipcalls(schemaModel): new_id = commands.execute_scalar("select last_insert_id()") shipdata = get_ship_data_for_id(schemaModel["ship_id"]) - message = shipdata['name'] + message = "The participant has been assigned to the shipcall." if "type_value" in schemaModel: match schemaModel["type_value"]: case 1: @@ -206,7 +206,7 @@ def PutShipcalls(schemaModel, original_payload=None): theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]}) if theshipcall is sentinel: return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} - + was_canceled = theshipcall["canceled"] provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None @@ -242,20 +242,6 @@ def PutShipcalls(schemaModel, original_payload=None): query = "UPDATE shipcall SET " + ", ".join(update_clauses) + " WHERE id = ?id?" commands.execute(query, param=schemaModel) - ship_id_value = schemaModel.get("ship_id") if (provided_keys is None or "ship_id" in provided_keys) else theshipcall["ship_id"] - - shipdata = get_ship_data_for_id(ship_id_value) - message = shipdata['name'] - type_value = schemaModel.get("type_value") if (provided_keys is None or "type" in provided_keys) else theshipcall["type"] - if type_value is not None: - match type_value: - case 1: - message += " [ARRIVAL]" - case 2: - message += " [DEPARTURE]" - case 3: - message += " [SHIFTING]" - # pquery = SQLQuery.get_shipcall_participant_map_by_shipcall_id() pquery = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?" pdata = commands.query(pquery,param={"id" : schemaModel["id"]}) # existing list of assignments @@ -263,7 +249,7 @@ def PutShipcalls(schemaModel, original_payload=None): if schemaModel.get("participants") is None: schemaModel["participants"] = [] - # loop across passed participant ids, creating entries for those not present in pdata + # loop across passed participant ids, creating entries for those not present in pdata existing_notifications = get_notification_for_shipcall_and_type(schemaModel["id"], 1) # type = 1 is assignment @@ -274,6 +260,7 @@ def PutShipcalls(schemaModel, original_payload=None): found_participant = True break if not found_participant: + message = "The participant has been assigned to the shipcall." # nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() spquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" commands.execute(spquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}) @@ -307,18 +294,20 @@ def PutShipcalls(schemaModel, original_payload=None): commands.execute(nquery, param={"nid" : existing_notification["id"]}) else: # create un-assignment notification + message = "The participant has been unassigned from the shipcall." nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 5, ?message?)" commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : elem["participant_id"], "message" : message}) break - + canceled_value = schemaModel.get("canceled") if canceled_value is not None: if canceled_value and not was_canceled: + message = "The shipcall has been canceled." # create a canceled notification for all currently assigned participants stornoNotificationQuery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 7, ?message?)" for participant_assignment in schemaModel["participants"]: commands.execute(stornoNotificationQuery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "message" : message}) - + # save history data # TODO: set ETA properly # query = SQLQuery.create_sql_query_history_put() diff --git a/src/server/BreCal/services/schedule_routines.py b/src/server/BreCal/services/schedule_routines.py index 69e929f..81f62a4 100644 --- a/src/server/BreCal/services/schedule_routines.py +++ b/src/server/BreCal/services/schedule_routines.py @@ -314,7 +314,8 @@ def eval_next_24_hrs(): found_notification = True break if not found_notification: - commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":shipcall["name"]}) + message = "The shipcall is scheduled to arrive/depart within the next 24 hours." + commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":message}) except Exception as ex: logging.error(ex) diff --git a/src/server/BreCal/validators/validation_rules.py b/src/server/BreCal/validators/validation_rules.py index 03ce506..454dd26 100644 --- a/src/server/BreCal/validators/validation_rules.py +++ b/src/server/BreCal/validators/validation_rules.py @@ -5,11 +5,13 @@ import re import numpy as np import pandas as pd import datetime +import threading from BreCal.database.enums import StatusFlags from BreCal.validators.validation_rule_functions import ValidationRuleFunctions from BreCal.schemas.model import Shipcall from BreCal.local_db import getPoolConnection +_evaluation_lock = threading.Lock() class ValidationRules(ValidationRuleFunctions): """ @@ -74,37 +76,52 @@ class ValidationRules(ValidationRuleFunctions): return evaluation_state, violations def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame: - """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)""" - evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]] - evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old] - results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message) - # unbundle individual results. evaluation_states becomes an integer, violation - evaluation_states_new = [StatusFlags(res[0]).value for res in results] - violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results] - violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] + # Acquire lock to prevent race conditions during evaluation and notification creation + with _evaluation_lock: + """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returnsshipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)""" + evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]] + evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old] + results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message) - # build the list of evaluation times ('now', as isoformat) - #evaluation_time = self.get_notification_times(evaluation_states_new) + # unbundle individual results. evaluation_states becomes an integer, violation + evaluation_states_new = [StatusFlags(res[0]).value for res in results] + violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results] + violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] - send_notification = False - if evaluation_states_old is not None and evaluation_states_new is not None: - if len(evaluation_states_old) == 1 and len(evaluation_states_new) == 1: - if evaluation_states_old[0] != evaluation_states_new[0]: - pooledConnection = None - try: - pooledConnection = getPoolConnection() - commands = pydapper.using(pooledConnection) - notification_type = 3 # RED (mapped to time_conflict) - if evaluation_states_new[0] == 2: - match evaluation_states_old[0]: + # build the list of evaluation times ('now', as isoformat) + #evaluation_time = self.get_notification_times(evaluation_states_new) + + if evaluation_states_old is not None and evaluation_states_new is not None: + pooledConnection = None + participants = None + try: + pooledConnection = getPoolConnection() + commands = pydapper.using(pooledConnection) + + for shipcall_id, state_old_raw, state_new_raw, violation in zip(shipcall_df.index, evaluation_states_old, evaluation_states_new, violations): + state_old = int(state_old_raw) if state_old_raw is not None else 0 + state_new = int(state_new_raw) if state_new_raw is not None else 0 + logging.info(f"Shipcall {shipcall_id}: state_old={state_old}, state_new={state_new}") + if state_old == state_new: + continue + + if participants is None: + participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id = ?shipcall_id?" + participants = [participant for participant in commands.query(participant_query, model=dict, param={"shipcall_id" : int(shipcall_id)}) if participant.get("type") != 1] + + notification_type = 3 # RED (mapped to time_conflict) + send_notification = False + + if state_new == 2: + match state_old: case 0: send_notification = True case 1: send_notification = True notification_type = 6 # YELLOW (mapped to missing_data) - if evaluation_states_new[0] == 3: - match evaluation_states_old[0]: + elif state_new == 3: + match state_old: case 0: send_notification = True case 1: @@ -113,33 +130,35 @@ class ValidationRules(ValidationRuleFunctions): send_notification = True if send_notification: - query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)" - commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0]), "message" : violations[0]}) + logging.info(f"Creating notification(s) for shipcall {shipcall_id}, type={notification_type}") + query = f"INSERT INTO notification (shipcall_id, participant_id, type, level, message) VALUES (?shipcall_id?, ?participant_id?, {notification_type}, 0, ?message?)" + for participant in participants: + commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"], "message" : violation}) - if evaluation_states_new[0] == 1 and evaluation_states_old[0] != 0: # this resolves the conflict - query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0" - existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_df.index[0])}) - if len(existing_notification) > 0: - query = "DELETE from notification where id = ?id?" - commands.execute(query, param={"id" : existing_notification[0]["id"]}) - else: - query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)" - commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0])}) - finally: - if pooledConnection is not None: - pooledConnection.close() + if state_new == 1 and state_old != 0: # this resolves the time conflict + logging.info(f"Resolving notifications for shipcall {shipcall_id}, type={notification_type}") + query = f"DELETE from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0" + deleted_count = commands.execute(query, param={"shipcall_id" : int(shipcall_id)}) + logging.info(f"Deleted {deleted_count} existing notifications (yet unsent)") + if deleted_count == 0: + query = "INSERT INTO notification (shipcall_id, participant_id, type, level) VALUES (?shipcall_id?, ?participant_id?, 4, 0)" + for participant in participants: + commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"]}) + finally: + if pooledConnection is not None: + pooledConnection.close() - # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created - #evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new) + # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created + #evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new) - # TODO: detect evaluation state changes and create notifications + # TODO: detect evaluation state changes and create notifications - shipcall_df.loc[:,"evaluation"] = evaluation_states_new - shipcall_df.loc[:,"evaluation_message"] = violations - #shipcall_df.loc[:,"evaluation_time"] = evaluation_time - #shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent - return shipcall_df + shipcall_df.loc[:,"evaluation"] = evaluation_states_new + shipcall_df.loc[:,"evaluation_message"] = violations + #shipcall_df.loc[:,"evaluation_time"] = evaluation_time + #shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent + return shipcall_df def concise_evaluation_message_if_too_long(self, violation): """