From 39001b37a3bb3eb02e5d29f96eea92efd45a8db0 Mon Sep 17 00:00:00 2001 From: Daniel Schick Date: Mon, 15 Dec 2025 17:56:41 +0100 Subject: [PATCH] Added logging and a lock (threading) to make sure all notifications are created --- .../BreCal/validators/validation_rules.py | 129 ++++++++++-------- 1 file changed, 69 insertions(+), 60 deletions(-) diff --git a/src/server/BreCal/validators/validation_rules.py b/src/server/BreCal/validators/validation_rules.py index 85f507d..a9d4a9c 100644 --- a/src/server/BreCal/validators/validation_rules.py +++ b/src/server/BreCal/validators/validation_rules.py @@ -5,11 +5,13 @@ import re import numpy as np import pandas as pd import datetime +import threading from BreCal.database.enums import StatusFlags from BreCal.validators.validation_rule_functions import ValidationRuleFunctions from BreCal.schemas.model import Shipcall from BreCal.local_db import getPoolConnection +_evaluation_lock = threading.Lock() class ValidationRules(ValidationRuleFunctions): """ @@ -74,79 +76,86 @@ class ValidationRules(ValidationRuleFunctions): return evaluation_state, violations def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame: - """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)""" - evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]] - evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old] - results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message) - # unbundle individual results. evaluation_states becomes an integer, violation - evaluation_states_new = [StatusFlags(res[0]).value for res in results] - violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results] - violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] + # Acquire lock to prevent race conditions during evaluation and notification creation + with _evaluation_lock: + """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returnsshipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)""" + evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]] + evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old] + results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message) - # build the list of evaluation times ('now', as isoformat) - #evaluation_time = self.get_notification_times(evaluation_states_new) + # unbundle individual results. evaluation_states becomes an integer, violation + evaluation_states_new = [StatusFlags(res[0]).value for res in results] + violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results] + violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] - if evaluation_states_old is not None and evaluation_states_new is not None: - pooledConnection = None - try: - pooledConnection = getPoolConnection() - commands = pydapper.using(pooledConnection) + # build the list of evaluation times ('now', as isoformat) + #evaluation_time = self.get_notification_times(evaluation_states_new) - for shipcall_id, state_old_raw, state_new_raw, violation in zip(shipcall_df.index, evaluation_states_old, evaluation_states_new, violations): - state_old = int(state_old_raw) if state_old_raw is not None else 0 - state_new = int(state_new_raw) if state_new_raw is not None else 0 + if evaluation_states_old is not None and evaluation_states_new is not None: + pooledConnection = None + try: + pooledConnection = getPoolConnection() + commands = pydapper.using(pooledConnection) - if state_old == state_new: - continue + for shipcall_id, state_old_raw, state_new_raw, violation in zip(shipcall_df.index, evaluation_states_old, evaluation_states_new, violations): + state_old = int(state_old_raw) if state_old_raw is not None else 0 + state_new = int(state_new_raw) if state_new_raw is not None else 0 + logging.info(f"Shipcall {shipcall_id}: state_old={state_old}, state_new={state_new}") + if state_old == state_new: + continue - notification_type = 3 # RED (mapped to time_conflict) - send_notification = False + notification_type = 3 # RED (mapped to time_conflict) + send_notification = False - if state_new == 2: - match state_old: - case 0: - send_notification = True - case 1: - send_notification = True - notification_type = 6 # YELLOW (mapped to missing_data) - elif state_new == 3: - match state_old: - case 0: - send_notification = True - case 1: - send_notification = True - case 2: - send_notification = True + if state_new == 2: + match state_old: + case 0: + send_notification = True + case 1: + send_notification = True + notification_type = 6 # YELLOW (mapped to missing_data) + elif state_new == 3: + match state_old: + case 0: + send_notification = True + case 1: + send_notification = True + case 2: + send_notification = True - if send_notification: - query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)" - commands.execute(query, param={"shipcall_id" : int(shipcall_id), "message" : violation}) + if send_notification: + logging.info(f"Creating notification for shipcall {shipcall_id}, type={notification_type}") + query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)" + commands.execute(query, param={"shipcall_id" : int(shipcall_id), "message" : violation}) - if state_new == 1 and state_old != 0: # this resolves the conflict - query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0" - existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_id)}) - if len(existing_notification) > 0: - query = "DELETE from notification where id = ?id?" - commands.execute(query, param={"id" : existing_notification[0]["id"]}) - else: - query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)" - commands.execute(query, param={"shipcall_id" : int(shipcall_id)}) - finally: - if pooledConnection is not None: - pooledConnection.close() + if state_new == 1 and state_old != 0: # this resolves the time conflict + logging.info(f"Resolving notifications for shipcall {shipcall_id}, type={notification_type}") + query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0" + existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_id)}) + logging.info(f"Found {len(existing_notification)} existing notifications (yet unsent)") + if len(existing_notification) > 0: + logging.info(f"Deleting notification id={existing_notification[0]['id']}") + query = "DELETE from notification where id = ?id?" + commands.execute(query, param={"id" : existing_notification[0]["id"]}) + else: + query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)" + commands.execute(query, param={"shipcall_id" : int(shipcall_id)}) + finally: + if pooledConnection is not None: + pooledConnection.close() - # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created - #evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new) + # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created + #evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new) - # TODO: detect evaluation state changes and create notifications + # TODO: detect evaluation state changes and create notifications - shipcall_df.loc[:,"evaluation"] = evaluation_states_new - shipcall_df.loc[:,"evaluation_message"] = violations - #shipcall_df.loc[:,"evaluation_time"] = evaluation_time - #shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent - return shipcall_df + shipcall_df.loc[:,"evaluation"] = evaluation_states_new + shipcall_df.loc[:,"evaluation_message"] = violations + #shipcall_df.loc[:,"evaluation_time"] = evaluation_time + #shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent + return shipcall_df def concise_evaluation_message_if_too_long(self, violation): """