git_brcal/src/server/BreCal/validators/validation_rules.py

246 lines
14 KiB
Python

import copy
import logging
import pydapper
import re
import numpy as np
import pandas as pd
import datetime
import threading
from BreCal.database.enums import StatusFlags
from BreCal.validators.validation_rule_functions import ValidationRuleFunctions
from BreCal.schemas.model import Shipcall
from BreCal.local_db import getPoolConnection
_evaluation_lock = threading.Lock()
class ValidationRules(ValidationRuleFunctions):
"""
An object that determines the traffic light state for validation and notification. The provided feedback ('green', 'yellow', 'red')
determines, whether the state is critical. It uses ValidationRuleState enumerations.
In case of a critical validation state, the user's input prompt may be interrupted and the user may be warned.
In case of a critical notification state, the respective users will be automatically notified after n seconds. (#TODO_n_seconds_delay)
"""
def __init__(self, sql_handler): # use the entire data that is provided for this query (e.g., json input)
super().__init__(sql_handler)
return
def evaluate(self, shipcall):
"""
1.) prepare df_times, which every validation rule tends to use
calling this only once saves a lot of computational overhead
2.) apply all validation rules
returns: (evaluation_state, violations)
"""
# prepare df_times, which every validation rule tends to use
all_df_times = self.sql_handler.df_dict.get('times', pd.DataFrame()) # -> pd.DataFrame
if len(all_df_times)==0:
return (StatusFlags.GREEN.value, [])
spm = self.sql_handler.df_dict["shipcall_participant_map"]
if len(spm.loc[spm["shipcall_id"]==shipcall.id])==0:
return (StatusFlags.GREEN.value, [])
# filter by shipcall id
df_times = self.sql_handler.get_times_of_shipcall(shipcall)
# apply all validation rules
# list of tuples, where each element is (state, msg)
evaluation_results = [elem(shipcall, df_times) for elem in self.get_validation_rule_functions()]
# filter out all 'None' results, which indicate that no violation occured.
evaluation_results = [evaluation_result for evaluation_result in evaluation_results if evaluation_result[1] is not None]
# 'translate' all error codes into readable, human-understandable format.
evaluation_results = [(state, self.describe_error_message(msg)) for (state, msg) in evaluation_results]
if evaluation_results:
logging.info(f"Validation results for shipcall {shipcall.id}: {evaluation_results}")
# check, what the maximum state flag is and return it
evaluation_state = np.max(np.array([result[0].value for result in evaluation_results])) if len(evaluation_results)>0 else StatusFlags.GREEN.value
evaluation_verbosity = [result[1] for result in evaluation_results]
return (evaluation_state, evaluation_verbosity)
def evaluation_verbosity(self, evaluation_state, evaluation_results):
"""This function suggestions verbosity for the evaluation results. Based on 'True'/'False' evaluation outcome, the returned string is different."""
if evaluation_state:
return f"OK! The validation was successful. There are no rule violations."
else:
verbose_string = "These are:" + "\n\t".join(evaluation_results) # every element of the list will be displayed in a new line with a tab
return f"FAILED VALIDATION. There have been {len(evaluation_results)} violations. {verbose_string}"
def evaluate_shipcall_from_df(self, x):
shipcall = Shipcall(**{**{'id':x.name}, **x.to_dict()})
evaluation_state, violations = self.evaluate(shipcall)
return evaluation_state, violations
def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame:
# Acquire lock to prevent race conditions during evaluation and notification creation
with _evaluation_lock:
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returnsshipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# unbundle individual results. evaluation_states becomes an integer, violation
evaluation_states_new = [StatusFlags(res[0]).value for res in results]
violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results]
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
# build the list of evaluation times ('now', as isoformat)
#evaluation_time = self.get_notification_times(evaluation_states_new)
if evaluation_states_old is not None and evaluation_states_new is not None:
pooledConnection = None
try:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
for shipcall_id, state_old_raw, state_new_raw, violation in zip(shipcall_df.index, evaluation_states_old, evaluation_states_new, violations):
state_old = int(state_old_raw) if state_old_raw is not None else 0
state_new = int(state_new_raw) if state_new_raw is not None else 0
logging.info(f"Shipcall {shipcall_id}: state_old={state_old}, state_new={state_new}")
if state_old == state_new:
continue
notification_type = 3 # RED (mapped to time_conflict)
send_notification = False
if state_new == 2:
match state_old:
case 0:
send_notification = True
case 1:
send_notification = True
notification_type = 6 # YELLOW (mapped to missing_data)
elif state_new == 3:
match state_old:
case 0:
send_notification = True
case 1:
send_notification = True
case 2:
send_notification = True
if send_notification:
logging.info(f"Creating notification for shipcall {shipcall_id}, type={notification_type}")
query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)"
commands.execute(query, param={"shipcall_id" : int(shipcall_id), "message" : violation})
if state_new == 1 and state_old != 0: # this resolves the time conflict
logging.info(f"Resolving notifications for shipcall {shipcall_id}, type={notification_type}")
query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0"
existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_id)})
logging.info(f"Found {len(existing_notification)} existing notifications (yet unsent)")
if len(existing_notification) > 0:
logging.info(f"Deleting notification id={existing_notification[0]['id']}")
query = "DELETE from notification where id = ?id?"
commands.execute(query, param={"id" : existing_notification[0]["id"]})
else:
query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)"
commands.execute(query, param={"shipcall_id" : int(shipcall_id)})
finally:
if pooledConnection is not None:
pooledConnection.close()
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
#evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new)
# TODO: detect evaluation state changes and create notifications
shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations
#shipcall_df.loc[:,"evaluation_time"] = evaluation_time
#shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
return shipcall_df
def concise_evaluation_message_if_too_long(self, violation):
"""
when many validation rules are violated at once, the resulting evaluation message may exceed 512 characters (which the mysql database allows)
in these cases, use a regular expression to provide a concise message, where the 'concise' description is only the list of violated rools
"""
if violation is None:
return violation
if len(violation)>=512:
concise = re.findall(r'{(.*?)\}', violation)
# e.g.: Evaluation message too long. Violated Rules: ['Rule #0001C', 'Rule #0001H', 'Rule #0001F', 'Rule #0001G', 'Rule #0001L', 'Rule #0001M', 'Rule #0001J', 'Rule #0001K']
violation = f"Evaluation message too long. Violated Rules: {concise}"
return violation
def undefined_method(self) -> str:
"""this function should apply the ValidationRules to the respective .shipcall, in regards to .times"""
return (StatusFlags.GREEN, False) # (state:str, should_notify:bool)
def determine_notification_state(self, state_old, state_new):
"""
this method determines state changes in the notification state. When the state increases, a user is notified about it.
state order: (NONE = GREEN < YELLOW < RED)
"""
# identify a state increase
should_notify = self.identify_notification_state_change(state_old=state_old, state_new=state_new)
# when a state increases, a notification must be sent. Thereby, the field should be set to False ({evaluation_notifications_sent})
evaluation_notifications_sent = False if bool(should_notify) else None
return evaluation_notifications_sent
def identify_notification_state_change(self, state_old, state_new) -> bool:
"""
determines, whether the observed state change should trigger a notification.
internally, this function maps StatusFlags to an integer and determines, if the successor state is more severe than the predecessor.
state changes trigger a notification in the following cases:
green -> yellow
green -> red
yellow -> red
(none -> yellow) or (none -> red)
due to the values in the enumeration objects, the states are mapped to provide this function.
green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
returns bool, whether a notification should be triggered
"""
# state_old is always considered at least 'Green' (1)
if state_old is None:
state_old = StatusFlags.NONE.value
state_old = max(int(state_old), StatusFlags.GREEN.value)
return int(state_new) > int(state_old)
def get_notification_times(self, evaluation_states_new)->list[datetime.datetime]:
"""# build the list of evaluation times ('now', as isoformat)"""
evaluation_times = [datetime.datetime.now().isoformat() for _i in range(len(evaluation_states_new))]
return evaluation_times
def get_notification_states(self, evaluation_states_old, evaluation_states_new)->list[bool]:
"""# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created"""
evaluation_notifications_sent = [self.determine_notification_state(state_old=int(state_old), state_new=int(state_new)) for state_old, state_new in zip(evaluation_states_old, evaluation_states_new)]
return evaluation_notifications_sent
def inspect_shipcall_evaluation(vr, sql_handler, shipcall_id):
"""
# debug only!
a simple debugging function, which serves in inspecting an evaluation function for a single shipcall id. It returns the result and all related data.
returns: result, shipcall_df (filtered by shipcall id), shipcall, spm (shipcall participant map, filtered by shipcall id), times_df (filtered by shipcall id)
"""
shipcall_df = sql_handler.df_dict.get("shipcall").loc[shipcall_id:shipcall_id,:]
shipcall = Shipcall(**{**{"id":shipcall_id},**sql_handler.df_dict.get("shipcall").loc[shipcall_id].to_dict()})
result = vr.evaluate(shipcall=shipcall)
notification_state = vr.identify_notification_state_change(state_old=int(shipcall.evaluation), state_new=int(result[0]))
print(f"Previous state: {int(shipcall.evaluation)}, New State: {result[0]}, Notification State: {notification_state}")
times_df = sql_handler.df_dict.get("times")
times_df = times_df.loc[times_df["shipcall_id"]==shipcall_id]
spm = sql_handler.df_dict["shipcall_participant_map"]
spm = spm.loc[spm["shipcall_id"]==shipcall_id]
return result, shipcall_df, shipcall, spm, times_df