import copy import logging import pydapper import re import numpy as np import pandas as pd import datetime from BreCal.database.enums import StatusFlags from BreCal.validators.validation_rule_functions import ValidationRuleFunctions from BreCal.schemas.model import Shipcall from BreCal.local_db import getPoolConnection class ValidationRules(ValidationRuleFunctions): """ An object that determines the traffic light state for validation and notification. The provided feedback ('green', 'yellow', 'red') determines, whether the state is critical. It uses ValidationRuleState enumerations. In case of a critical validation state, the user's input prompt may be interrupted and the user may be warned. In case of a critical notification state, the respective users will be automatically notified after n seconds. (#TODO_n_seconds_delay) """ def __init__(self, sql_handler): # use the entire data that is provided for this query (e.g., json input) super().__init__(sql_handler) return def evaluate(self, shipcall): """ 1.) prepare df_times, which every validation rule tends to use calling this only once saves a lot of computational overhead 2.) apply all validation rules returns: (evaluation_state, violations) """ # prepare df_times, which every validation rule tends to use all_df_times = self.sql_handler.df_dict.get('times', pd.DataFrame()) # -> pd.DataFrame if len(all_df_times)==0: return (StatusFlags.GREEN.value, []) spm = self.sql_handler.df_dict["shipcall_participant_map"] if len(spm.loc[spm["shipcall_id"]==shipcall.id])==0: return (StatusFlags.GREEN.value, []) # filter by shipcall id df_times = self.sql_handler.get_times_of_shipcall(shipcall) # apply all validation rules # list of tuples, where each element is (state, msg) evaluation_results = [elem(shipcall, df_times) for elem in self.get_validation_rule_functions()] # filter out all 'None' results, which indicate that no violation occured. evaluation_results = [evaluation_result for evaluation_result in evaluation_results if evaluation_result[1] is not None] # 'translate' all error codes into readable, human-understandable format. evaluation_results = [(state, self.describe_error_message(msg)) for (state, msg) in evaluation_results] if evaluation_results: logging.info(f"Validation results for shipcall {shipcall.id}: {evaluation_results}") # check, what the maximum state flag is and return it evaluation_state = np.max(np.array([result[0].value for result in evaluation_results])) if len(evaluation_results)>0 else StatusFlags.GREEN.value evaluation_verbosity = [result[1] for result in evaluation_results] return (evaluation_state, evaluation_verbosity) def evaluation_verbosity(self, evaluation_state, evaluation_results): """This function suggestions verbosity for the evaluation results. Based on 'True'/'False' evaluation outcome, the returned string is different.""" if evaluation_state: return f"OK! The validation was successful. There are no rule violations." else: verbose_string = "These are:" + "\n\t".join(evaluation_results) # every element of the list will be displayed in a new line with a tab return f"FAILED VALIDATION. There have been {len(evaluation_results)} violations. {verbose_string}" def evaluate_shipcall_from_df(self, x): shipcall = Shipcall(**{**{'id':x.name}, **x.to_dict()}) evaluation_state, violations = self.evaluate(shipcall) return evaluation_state, violations def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame: """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)""" evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]] evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old] results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message) # unbundle individual results. evaluation_states becomes an integer, violation evaluation_states_new = [StatusFlags(res[0]).value for res in results] violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results] violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] # build the list of evaluation times ('now', as isoformat) #evaluation_time = self.get_notification_times(evaluation_states_new) send_notification = False if evaluation_states_old is not None and evaluation_states_new is not None: if len(evaluation_states_old) == 1 and len(evaluation_states_new) == 1: if evaluation_states_old[0] != evaluation_states_new[0]: pooledConnection = None try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) notification_type = 3 # RED (mapped to time_conflict) if evaluation_states_new[0] == 2: match evaluation_states_old[0]: case 0: send_notification = True case 1: send_notification = True notification_type = 6 # YELLOW (mapped to missing_data) if evaluation_states_new[0] == 3: match evaluation_states_old[0]: case 0: send_notification = True case 1: send_notification = True case 2: send_notification = True if send_notification: query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)" commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0]), "message" : violations[0]}) if evaluation_states_new[0] == 1 and evaluation_states_old[0] != 0: # this resolves the conflict query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0" existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_df.index[0])}) if len(existing_notification) > 0: query = "DELETE from notification where id = ?id?" commands.execute(query, param={"id" : existing_notification[0]["id"]}) else: query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)" commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0])}) finally: if pooledConnection is not None: pooledConnection.close() # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created #evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new) # TODO: detect evaluation state changes and create notifications shipcall_df.loc[:,"evaluation"] = evaluation_states_new shipcall_df.loc[:,"evaluation_message"] = violations #shipcall_df.loc[:,"evaluation_time"] = evaluation_time #shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent return shipcall_df def concise_evaluation_message_if_too_long(self, violation): """ when many validation rules are violated at once, the resulting evaluation message may exceed 512 characters (which the mysql database allows) in these cases, use a regular expression to provide a concise message, where the 'concise' description is only the list of violated rools """ if violation is None: return violation if len(violation)>=512: concise = re.findall(r'{(.*?)\}', violation) # e.g.: Evaluation message too long. Violated Rules: ['Rule #0001C', 'Rule #0001H', 'Rule #0001F', 'Rule #0001G', 'Rule #0001L', 'Rule #0001M', 'Rule #0001J', 'Rule #0001K'] violation = f"Evaluation message too long. Violated Rules: {concise}" return violation def undefined_method(self) -> str: """this function should apply the ValidationRules to the respective .shipcall, in regards to .times""" return (StatusFlags.GREEN, False) # (state:str, should_notify:bool) def determine_notification_state(self, state_old, state_new): """ this method determines state changes in the notification state. When the state increases, a user is notified about it. state order: (NONE = GREEN < YELLOW < RED) """ # identify a state increase should_notify = self.identify_notification_state_change(state_old=state_old, state_new=state_new) # when a state increases, a notification must be sent. Thereby, the field should be set to False ({evaluation_notifications_sent}) evaluation_notifications_sent = False if bool(should_notify) else None return evaluation_notifications_sent def identify_notification_state_change(self, state_old, state_new) -> bool: """ determines, whether the observed state change should trigger a notification. internally, this function maps StatusFlags to an integer and determines, if the successor state is more severe than the predecessor. state changes trigger a notification in the following cases: green -> yellow green -> red yellow -> red (none -> yellow) or (none -> red) due to the values in the enumeration objects, the states are mapped to provide this function. green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than". returns bool, whether a notification should be triggered """ # state_old is always considered at least 'Green' (1) if state_old is None: state_old = StatusFlags.NONE.value state_old = max(int(state_old), StatusFlags.GREEN.value) return int(state_new) > int(state_old) def get_notification_times(self, evaluation_states_new)->list[datetime.datetime]: """# build the list of evaluation times ('now', as isoformat)""" evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))] return evaluation_times def get_notification_states(self, evaluation_states_old, evaluation_states_new)->list[bool]: """# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created""" evaluation_notifications_sent = [self.determine_notification_state(state_old=int(state_old), state_new=int(state_new)) for state_old, state_new in zip(evaluation_states_old, evaluation_states_new)] return evaluation_notifications_sent def inspect_shipcall_evaluation(vr, sql_handler, shipcall_id): """ # debug only! a simple debugging function, which serves in inspecting an evaluation function for a single shipcall id. It returns the result and all related data. returns: result, shipcall_df (filtered by shipcall id), shipcall, spm (shipcall participant map, filtered by shipcall id), times_df (filtered by shipcall id) """ shipcall_df = sql_handler.df_dict.get("shipcall").loc[shipcall_id:shipcall_id,:] shipcall = Shipcall(**{**{"id":shipcall_id},**sql_handler.df_dict.get("shipcall").loc[shipcall_id].to_dict()}) result = vr.evaluate(shipcall=shipcall) notification_state = vr.identify_notification_state_change(state_old=int(shipcall.evaluation), state_new=int(result[0])) print(f"Previous state: {int(shipcall.evaluation)}, New State: {result[0]}, Notification State: {notification_state}") times_df = sql_handler.df_dict.get("times") times_df = times_df.loc[times_df["shipcall_id"]==shipcall_id] spm = sql_handler.df_dict["shipcall_participant_map"] spm = spm.loc[spm["shipcall_id"]==shipcall_id] return result, shipcall_df, shipcall, spm, times_df