git_brcal/src/server/BreCal/validators/input_validation_times.py

524 lines
29 KiB
Python

import typing
import json
import datetime
from abc import ABC, abstractmethod
from marshmallow import ValidationError
from string import ascii_letters, digits
from BreCal.schemas.model import Ship, Shipcall, Berth, User, Participant, ShipcallType, Times
from BreCal.impl.participant import GetParticipant
from BreCal.impl.ships import GetShips
from BreCal.impl.berths import GetBerths
from BreCal.impl.times import GetTimes
from BreCal.database.enums import ParticipantType, ParticipantFlag
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone
from BreCal.database.sql_queries import SQLQuery
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters
import werkzeug
def build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]:
"""
The required fields of a POST-request depend on ShipcallType and ParticipantType. This function creates
a dictionary, which maps those types to a list of required fields.
The participant types 'undefined' and 'bsmd' should not be used in POST-requests. They return 'None'.
"""
post_data_type_dependent_required_fields_dict = {
ShipcallType.arrival:{
ParticipantType.undefined:[], # should not be set in POST requests
ParticipantType.BSMD:[], # should not be set in POST requests
ParticipantType.TERMINAL:[],
ParticipantType.AGENCY:[],
ParticipantType.MOORING:[],
ParticipantType.PILOT:[],
ParticipantType.PORT_ADMINISTRATION:[],
ParticipantType.TUG:[],
},
ShipcallType.departure:{
ParticipantType.undefined:[], # should not be set in POST requests
ParticipantType.BSMD:[], # should not be set in POST requests
ParticipantType.TERMINAL:[],
ParticipantType.AGENCY:[],
ParticipantType.MOORING:[],
ParticipantType.PILOT:[],
ParticipantType.PORT_ADMINISTRATION:[],
ParticipantType.TUG:[],
},
ShipcallType.shifting:{
ParticipantType.undefined:[], # should not be set in POST requests
ParticipantType.BSMD:[], # should not be set in POST requests
ParticipantType.TERMINAL:[],
ParticipantType.AGENCY:[],
ParticipantType.MOORING:[],
ParticipantType.PILOT:[],
ParticipantType.PORT_ADMINISTRATION:[],
ParticipantType.TUG:[],
},
}
return post_data_type_dependent_required_fields_dict
class InputValidationTimes():
"""
This class combines a complex set of individual input validation functions into a joint object.
It uses static methods, so the object does not need to be instantiated, but functions can be called immediately.
Example:
InputValidationTimes.evaluate(user_data, loadedModel, content)
When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues.
"""
def __init__(self) -> None:
pass
@staticmethod
def evaluate_post_data(user_data:dict, loadedModel:dict, content:dict):
# 0.) Check for the presence of required fields
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
# 1.) datasets may only be created, if the current user fits the appropriate type in the ShipcallParticipantMap
InputValidationTimes.check_if_user_fits_shipcall_participant_map(user_data, loadedModel, content)
# 2.) datasets may only be created, if the respective participant type did not already create one.
InputValidationTimes.check_if_entry_already_exists_for_participant_type(user_data, loadedModel, content)
# 3.) Reference checking
InputValidationTimes.check_dataset_references(content)
# 4.) Value checking
InputValidationTimes.check_dataset_values(user_data, loadedModel, content)
return
@staticmethod
def evaluate_put_data(user_data:dict, loadedModel:dict, content:dict):
# 1.) Check for the presence of required fields
InputValidationTimes.check_times_required_fields_put_data(content)
# 2.) Only users of the same participant_id, which the times dataset refers to, can update the entry
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=loadedModel, times_id=None)
# 3.) Reference checking
InputValidationTimes.check_dataset_references(content)
# 4.) Value checking
InputValidationTimes.check_dataset_values(user_data, loadedModel, content)
return
@staticmethod
def evaluate_delete_data(user_data:dict, times_id:typing.Optional[int]):
# 0.) an ID reference must be provided and will be converted to int
if times_id is None:
raise ValidationError({"id":"no times id provided"})
times_id = int(times_id) if not isinstance(times_id, int) else times_id
# 1.) The dataset entry may not be deleted already
InputValidationTimes.check_if_entry_is_already_deleted(times_id)
# 2.) Only users of the same participant_id, which the times dataset refers to, can delete the entry
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=None, times_id=times_id)
return
@staticmethod
def check_if_entry_is_already_deleted(times_id:int):
"""
When calling a delete request for times, the dataset may not be deleted already. This method
makes sure, that the request contains and ID, has a matching entry in the database.
When a times dataset is deleted, it is directly removed from the database.
To identify deleted entries, query from the database and check, whether there is a match for the times id.
"""
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
query = "SELECT shipcall_id FROM times WHERE id = ?id?"
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
if len(pdata)==0:
raise ValidationError({"deleted":f"The selected time entry is already deleted. ID: {times_id}"})
return
@staticmethod
def check_user_is_not_bsmd_type(user_data:dict):
"""a new dataset may only be created by a user who is *not* belonging to participant group BSMD"""
is_bsmd = check_if_user_is_bsmd_type(user_data)
if is_bsmd:
raise ValidationError({"participant_type":f"current user belongs to BSMD. Cannot post 'times' datasets. Found user data: {user_data}"})
return
@staticmethod
def check_dataset_values(user_data:dict, loadedModel:dict, content:dict):
"""
this method validates POST and PUT data. Most of the dataset arguments are validated directly in the
BreCal.schemas.model.TimesSchema, using @validates. This is exclusive for 'simple' validation rules.
This applies to:
"remarks" & "berth_info"
"eta_berth", "etd_berth", "lock_time", "zone_entry", "operations_start", "operations_end"
"""
# while InputValidationTimes.check_user_is_not_bsmd_type already validates a user, this method
# validates the times dataset.
# ensure loadedModel["participant_type"] is of type ParticipantType
if not isinstance(loadedModel["participant_type"], ParticipantType):
loadedModel["participant_type"] = ParticipantType(loadedModel["participant_type"])
if ParticipantType.BSMD in loadedModel["participant_type"]:
raise ValidationError({"participant_type":f"current user belongs to BSMD. Cannot post times datasets. Found user data: {user_data}"})
if (loadedModel["etd_interval_end"] is not None) and (loadedModel["etd_berth"] is not None):
time_end_after_time_start = loadedModel["etd_interval_end"] >= loadedModel["etd_berth"]
if not time_end_after_time_start:
raise ValidationError({"etd":f"The provided time interval for the estimated departure time is invalid. The interval end takes place before the interval start. Found interval data: {loadedModel['etd_berth']} to {loadedModel['etd_interval_end']}"})
if (loadedModel["eta_interval_end"] is not None) and (loadedModel["eta_berth"] is not None):
time_end_after_time_start = loadedModel["eta_interval_end"] >= loadedModel["eta_berth"]
if not time_end_after_time_start:
raise ValidationError({"eta":f"The provided time interval for the estimated arrival time is invalid. The interval begin takes place after the interval end. Found interval data: {loadedModel['eta_berth']} to {loadedModel['eta_interval_end']}"})
return
@staticmethod
def check_dataset_references(content:dict):
"""
When IDs are referenced, they must exist in the database. This method individually validates the existance of referred
berth ID, participant IDs and shipcall ID.
Note: whenever an ID is 'None', there is no exception, because a different method is supposed to capture non-existant mandatory fields.
"""
# extract the IDs
berth_id, shipcall_id, participant_id = content.get("berth_id"), content.get("shipcall_id"), content.get("participant_id")
valid_berth_id_reference = check_if_berth_id_is_valid(berth_id)
if not valid_berth_id_reference:
raise ValidationError({"berth_id":f"The referenced berth_id '{berth_id}' does not exist in the database."})
valid_shipcall_id_reference = check_if_shipcall_id_is_valid(shipcall_id)
if not valid_shipcall_id_reference:
raise ValidationError({"shipcall_id":f"The referenced shipcall_id '{shipcall_id}' does not exist in the database."})
valid_participant_id_reference = check_if_participant_id_is_valid_standalone(participant_id, participant_type=None)
if not valid_participant_id_reference:
raise ValidationError({"participant_id":f"The referenced participant_id '{participant_id}' does not exist in the database."})
return
@staticmethod
def check_times_required_fields_post_data(loadedModel:dict, content:dict):
"""
Depending on ShipcallType and ParticipantType, there is a rather complex set of required fields.
Independent of those types, any POST request for times should always include the default fields.
The dependent and independent fields are validated by checking, whether the respective value in 'content'
is undefined (returns None). When any of these fields is undefined, a ValidationError is raised.
"""
participant_type = loadedModel["participant_type"]
shipcall_id = loadedModel["shipcall_id"]
# build a dictionary of id:item pairs, so one can select the respective participant
# must look-up the shipcall_type based on the shipcall_id
shipcalls = get_shipcall_id_dictionary()
shipcall_type = ShipcallType[shipcalls.get(shipcall_id,{}).get("type",ShipcallType.undefined.name)]
if (participant_type is None) or (int(shipcall_type) == int(ShipcallType.undefined)):
raise ValidationError({"required_fields":f"At least one of the required fields is missing. Missing: 'participant_type' or 'shipcall_type'"})
# build a list of required fields based on shipcall and participant type, as well as type-independent fields
independent_required_fields = InputValidationTimes.get_post_data_type_independent_fields()
dependent_required_fields = InputValidationTimes.get_post_data_type_dependent_fields(shipcall_type, participant_type)
required_fields = independent_required_fields + dependent_required_fields
# generate a list of booleans, where each element shows, whether one of the required fields is missing.
missing_required_fields = [
content.get(field,None) is None for field in required_fields
]
if any(missing_required_fields):
# create a tuple of (field_key, bool) to describe to a user, which one of the fields may be missing
verbosity_tuple = [(field, missing) for field, missing in zip(required_fields, missing_required_fields) if missing]
raise ValidationError({"required_fields":f"At least one of the required fields is missing. Missing: {verbosity_tuple}"})
return
@staticmethod
def check_times_required_fields_put_data(content:dict):
"""in a PUT request, only the 'id' is a required field. All other fields are simply ignored, when they are not provided."""
if content.get("id") is None:
raise ValidationError({"id":f"A PUT-request requires an 'id' reference, which was not found."})
return
@staticmethod
def get_post_data_type_independent_fields()->list[str]:
"""
Independent of the ShipcallType and ParticipantType, any POST request for times should always include the default fields.
"""
independent_required_fields = [
"shipcall_id", "participant_id", "participant_type"
]
return independent_required_fields
@staticmethod
def get_post_data_type_dependent_fields(shipcall_type:typing.Union[int, ShipcallType], participant_type:typing.Union[int, ParticipantType]):
"""
Depending on ShipcallType and ParticipantType, there is a rather complex set of required fields.
Arriving shipcalls need arrival times (e.g., 'eta'), Departing shipcalls need departure times (e.g., 'etd') and
Shifting shipcalls need both times (e.g., 'eta' and 'etd').
Further, the ParticipantType determines the set of relevant times. In particular, the terminal uses
'operations_start' and 'operations_end', while other users use 'eta_berth' or 'etd_berth'.
"""
# ensure that both, shipcall_type and participant_type, refer to the enumerators, as opposed to integers.
if not isinstance(shipcall_type, ShipcallType):
shipcall_type = ShipcallType(shipcall_type)
if not isinstance(participant_type, ParticipantType):
participant_type = ParticipantType(participant_type)
# build a dictionary, which maps shipcall type and participant type to a list of fields
dependent_required_fields_dict = build_post_data_type_dependent_required_fields_dict()
# select shipcall type & participant type
dependent_required_fields = dependent_required_fields_dict.get(shipcall_type,{}).get(participant_type,[])
dependent_required_fields = dependent_required_fields if dependent_required_fields is not None else []
return dependent_required_fields
@staticmethod
def check_if_user_fits_shipcall_participant_map(user_data:dict, loadedModel:dict, content:dict, spm_shipcall_data=None):
"""
a new dataset may only be created, if the user belongs to the participant group (participant_id),
which is assigned to the shipcall within the ShipcallParticipantMap
This method does not validate, what the POST-request contains, but it validates, whether the *user* is
authorized to send the request.
This method also checks for a special case: when an assigned AGENCY participant has the .BSMD flag enabled,
a user of type BSMD may also post the times dataset.
options:
spm_shipcall_data:
data from the ShipcallParticipantMap, which refers to the respective shipcall ID. The SPM can be
an optional argument to allow for much easier unit testing.
"""
### TIMES DATASET (ShipcallParticipantMap) ###
# identify shipcall_id
shipcall_id = loadedModel["shipcall_id"]
DATASET_participant_type = ParticipantType(loadedModel["participant_type"]) if not isinstance(loadedModel["participant_type"],ParticipantType) else loadedModel["participant_type"]
# get ShipcallParticipantMap for the shipcall_id
if spm_shipcall_data is None:
# read the ShipcallParticipantMap entry of the current shipcall_id. This is used within the input validation of a PUT request
# creates a list of {'participant_id: ..., 'type': ...} elements
spm_shipcall_data = execute_sql_query_standalone(
query = "SELECT participant_id, type FROM shipcall_participant_map WHERE (shipcall_id=?shipcall_id? AND type=?type?)",
param={"shipcall_id":shipcall_id, "type":int(DATASET_participant_type)},
pooledConnection=None
)
DATASET_participant_id = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type=DATASET_participant_type, spm_shipcall_data=spm_shipcall_data)
### USER DATA (token) ###
# identify user's participant_id & type (get all participants; then filter these for the {participant_id})
user_participant_id = user_data["participant_id"] #participants = get_participant_id_dictionary() #participant_type = ParticipantType(participants.get(participant_id,{}).get("type"))
if (ParticipantType.AGENCY in DATASET_participant_type):
special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, DATASET_participant_id, DATASET_participant_type)
if (special_case__bsmd_may_edit_agency_dataset):
# when a BSMD user posts a dataset of an AGENCY with BSMD-flag, there is no violation
return
# check, if participant_id is assigned to the ShipcallParticipantMap
matching_spm = [
spm
for spm in spm_shipcall_data
if spm.get("participant_id")==user_participant_id
]
if not len(matching_spm)>0:
raise ValidationError({"participant_id":f'The participant group with id {user_participant_id} is not assigned to the shipcall. Found ShipcallParticipantMap: {spm_shipcall_data}'}) # part of a pytest.raises
return
@staticmethod
def check_if_entry_already_exists_for_participant_type(user_data:dict, loadedModel:dict, content:dict):
"""determines, whether a dataset for the participant type is already present"""
# determine participant_type and shipcall_id from the loadedModel
participant_type = loadedModel["participant_type"]
if not isinstance(participant_type, ParticipantType): # ensure the correct data type
participant_type = ParticipantType(participant_type)
shipcall_id = loadedModel["shipcall_id"]
# get all times entries of the shipcall_id from the database
times, status_code, headers = GetTimes(options={"shipcall_id":shipcall_id})
times = json.loads(times)
# check, if there is already a dataset for the participant type
participant_type_exists_already = any([ParticipantType(time_.get("participant_type",0)) in participant_type for time_ in times])
if participant_type_exists_already:
raise ValidationError({"participant_type":f"A dataset for the participant type is already present. Participant Type: {participant_type}. Times Datasets: {times}"})
return
@staticmethod
def check_user_belongs_to_same_group_as_dataset_determines(user_data:dict, loadedModel:typing.Optional[dict]=None, times_id:typing.Optional[int]=None, pdata:typing.Optional[list[dict]]=None):
"""
This method checks, whether a user belongs to the same participant_id, as the dataset entry refers to.
It is used in, both, PUT requests and DELETE requests, but uses different arguments to determine the matching
time dataset entry.
PUT:
loadedModel is unbundled to identify the matching times entry by the shipcall id
DELETE:
times_id is used to directly identify the matching times entry
A special exception takes place, when a participant of type AGENCY is involved. In those times-entries, users with the
IS_BSMD-Flag may also edit the entry.
"""
assert not ((loadedModel is None) and (times_id is None)), f"must provide either loadedModel OR times_id. Both are 'None'"
assert (loadedModel is None) or (times_id is None), f"must provide either loadedModel OR times_id. Both are defined."
# identify the user's participant id
user_participant_id = user_data["participant_id"]
# commonly used in the PUT-request
if loadedModel is not None:
shipcall_id = loadedModel["shipcall_id"]
participant_type = loadedModel["participant_type"]
# get the matching entry from the shipcall participant map. Raise an error, when there is no match.
participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type)
# commonly used in the DELETE-request
if times_id is not None:
if pdata is None: # regular behavior. pdata is only defined in unit tests.
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?"
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
# extracts the participant_id from the first matching entry, if applicable
if not len(pdata)>0:
# this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already
raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"})
else:
participant_type = pdata[0].get("participant_type")
shipcall_id = pdata[0].get("shipcall_id")
# get the matching entry from the shipcall participant map. Raise an error, when there is no match.
participant_id_of_times_dataset = pdata[0].get("participant_id")
# participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type)
# when the user's participant id is different from the times dataset, an exception is raised
if user_participant_id != participant_id_of_times_dataset:
# for some AGENCY participants, users with the BSMD flag may also edit the datasets
special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, participant_id_of_times_dataset, participant_type)
if special_case__bsmd_may_edit_agency_dataset:
return
else:
raise ValidationError({"user_participant_type":f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"})
return
@staticmethod
def get_participant_id_from_shipcall_participant_map(shipcall_id:int, participant_type:int, spm_shipcall_data=None)->int:
"""use shipcall_id and participant_type to identify the matching participant_id"""
if shipcall_id is None:
raise ValidationError({"shipcall_id":f"Could not find a referenced shipcall_id within the request."})
if spm_shipcall_data is None:
spm_shipcall_data = execute_sql_query_standalone(
query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(),
param={"id":shipcall_id, "type":participant_type},
command_type="query") # returns a list of matches
# raise an error when there are no matches
if len(spm_shipcall_data)==0:
raise ValidationError({"participant_type":f"Could not find a matching time dataset for the provided participant_type: {participant_type} at shipcall with id {shipcall_id}."})
participant_id_of_times_dataset = spm_shipcall_data[0].get("participant_id")
return participant_id_of_times_dataset
@staticmethod
def check_if_bsmd_may_edit_agency_dataset(user_participant_id:int, participant_id_of_times_dataset:int, participant_type:ParticipantType)->bool:
"""
This method determines, whether a BSMD user is allowed to edit an AGENCY dataset.
When the dataset does not refer to an agency, the method is not applicable (returns False).
If it is applicable,
a) find out, whether the assigned participant (AGENCY) has the BSMD flag
b) find out, whether the user is of type BSMD
If both is true, return True
args:
user_participant_id: ID of the user, obtained from the jwt-token
participant_id_of_times_dataset: assigned participant of the shipcall, obtained from the ShipcallParticipantMap
"""
# when the participant type of the dataset is not an AGENCY, this exception rule does not take place
dataset_participant_type_is_agency = int(participant_type)==int(ParticipantType.AGENCY)
if not dataset_participant_type_is_agency:
return False
### TIMES ENTRY (ShipcallParticipantMap) ###
# identify, whether the dataset's assigned participant has the BSMD flag
agency_has_bsmd_flag = InputValidationTimes.check_if_participant_has_bsmd_flag(participant_id=participant_id_of_times_dataset)
### USER DATA (token) ###
# determine, whether the user is of participant_type BSMD
user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id})
return (agency_has_bsmd_flag) & (user_is_bsmd_type)
@staticmethod
def check_if_participant_has_bsmd_flag(participant_id:int)->bool:
"""
Given a participant_id, this method checks, whether the participant with {participant_id}
has the .BSMD flag in the .flags field.
"""
# get the dataset's assigned Participant, which matches the SPM entry
participant = execute_sql_query_standalone(
SQLQuery.get_participant_from_id(),
param={"participant_id":participant_id},
command_type="single",
pooledConnection=None)
has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))]
return has_bsmd_flag
def deprecated_build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]:
"""
The required fields of a POST-request depend on ShipcallType and ParticipantType. This function creates
a dictionary, which maps those types to a list of required fields.
The participant types 'undefined' and 'bsmd' should not be used in POST-requests. They return 'None'.
"""
post_data_type_dependent_required_fields_dict = {
ShipcallType.arrival:{
ParticipantType.undefined:[], # should not be set in POST requests
ParticipantType.BSMD:[], # should not be set in POST requests
ParticipantType.TERMINAL:[],
ParticipantType.AGENCY:["eta_berth"],
ParticipantType.MOORING:["eta_berth"],
ParticipantType.PILOT:["eta_berth"],
ParticipantType.PORT_ADMINISTRATION:["eta_berth"],
ParticipantType.TUG:["eta_berth"],
},
ShipcallType.departure:{
ParticipantType.undefined:[], # should not be set in POST requests
ParticipantType.BSMD:[], # should not be set in POST requests
ParticipantType.TERMINAL:[],
ParticipantType.AGENCY:["etd_berth"],
ParticipantType.MOORING:["etd_berth"],
ParticipantType.PILOT:["etd_berth"],
ParticipantType.PORT_ADMINISTRATION:["etd_berth"],
ParticipantType.TUG:["etd_berth"],
},
ShipcallType.shifting:{
ParticipantType.undefined:[], # should not be set in POST requests
ParticipantType.BSMD:[], # should not be set in POST requests
ParticipantType.TERMINAL:[],
ParticipantType.AGENCY:["etd_berth"],
ParticipantType.MOORING:["etd_berth"],
ParticipantType.PILOT:["etd_berth"],
ParticipantType.PORT_ADMINISTRATION:["etd_berth"],
ParticipantType.TUG:["etd_berth"],
},
}
return post_data_type_dependent_required_fields_dict