From ea5aa132d23f3c1b8d7ae6038ca619c05560b5e9 Mon Sep 17 00:00:00 2001 From: Max Metz Date: Tue, 14 May 2024 15:42:20 +0200 Subject: [PATCH] refactoring 'validate_post_shipcall_data' into a novel object InputValidationShipcall. Implemented the majority of rules for POST and PUT requests. Unit tests have not been created & run yet --- src/server/BreCal/api/shipcalls.py | 3 + src/server/BreCal/database/sql_queries.py | 99 ++++++ src/server/BreCal/impl/shipcalls.py | 14 +- src/server/BreCal/schemas/model.py | 2 +- .../BreCal/validators/input_validation.py | 110 +----- .../validators/input_validation_shipcall.py | 330 ++++++++++++++++++ .../validators/input_validation_utils.py | 146 ++++++++ 7 files changed, 595 insertions(+), 109 deletions(-) create mode 100644 src/server/BreCal/validators/input_validation_utils.py diff --git a/src/server/BreCal/api/shipcalls.py b/src/server/BreCal/api/shipcalls.py index e43f7ec..6dfef58 100644 --- a/src/server/BreCal/api/shipcalls.py +++ b/src/server/BreCal/api/shipcalls.py @@ -5,6 +5,7 @@ from ..schemas import model from .. import impl from ..services.auth_guard import auth_guard, check_jwt from BreCal.validators.input_validation import validate_posted_shipcall_data, check_if_user_is_bsmd_type +from BreCal.validators.input_validation_shipcall import InputValidationShipcall import logging import json @@ -47,6 +48,7 @@ def PostShipcalls(): # validate the posted shipcall data validate_posted_shipcall_data(user_data, loadedModel, content) + # InputValidationShipcall.evaluate_post_data(user_data, loadedModel, content) except ValidationError as ex: logging.error(ex) @@ -78,6 +80,7 @@ def PutShipcalls(): is_bsmd = check_if_user_is_bsmd_type(user_data) if not is_bsmd: raise ValidationError(f"current user does not belong to BSMD. Cannot post shipcalls. Found user data: {user_data}") + # InputValidationShipcall.evaluate_put_data(user_data, loadedModel, content) except ValidationError as ex: logging.error(ex) diff --git a/src/server/BreCal/database/sql_queries.py b/src/server/BreCal/database/sql_queries.py index 551aa91..0940254 100644 --- a/src/server/BreCal/database/sql_queries.py +++ b/src/server/BreCal/database/sql_queries.py @@ -44,3 +44,102 @@ def create_sql_query_shipcall_get(options:dict)->str: assert query==query_two """ return query + + +def create_sql_query_shipcall_post(schemaModel:dict)->str: + query = "INSERT INTO shipcall (" + isNotFirst = False + for key in schemaModel.keys(): + if key == "id": + continue + if key == "participants": + continue + if key == "created": + continue + if key == "modified": + continue + if key == "evaluation": + continue + if key == "evaluation_message": + continue + if key == "type_value": + continue + if key == "evaluation_value": + continue + if isNotFirst: + query += "," + isNotFirst = True + query += key + query += ") VALUES (" + isNotFirst = False + for key in schemaModel.keys(): + param_key = key + if key == "id": + continue + if key == "participants": + continue + if key == "created": + continue + if key == "modified": + continue + if key == "evaluation": + continue + if key == "evaluation_message": + continue + if key == "type": + param_key = "type_value" + if key == "type_value": + continue + if key == "evaluation": + param_key = "evaluation_value" + if key == "evaluation_value": + continue + if isNotFirst: + query += "," + isNotFirst = True + query += "?" + param_key + "?" + query += ")" + return + +def create_sql_query_shipcall_put(schemaModel:dict)->str: + query = "UPDATE shipcall SET " + isNotFirst = False + for key in schemaModel.keys(): + param_key = key + if key == "id": + continue + if key == "participants": + continue + if key == "created": + continue + if key == "modified": + continue + if key == "evaluation": + continue + if key == "evaluation_message": + continue + if key == "type": + param_key = "type_value" + if key == "type_value": + continue + if key == "evaluation": + param_key = "evaluation_value" + if key == "evaluation_value": + continue + if isNotFirst: + query += ", " + isNotFirst = True + query += key + " = ?" + param_key + "? " + + query += "WHERE id = ?id?" + return query + + +def create_sql_query_history_post()->str: + query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 1)" + return query + +def create_sql_query_history_put()->str: + query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)" + return query + diff --git a/src/server/BreCal/impl/shipcalls.py b/src/server/BreCal/impl/shipcalls.py index de60788..0dfcf74 100644 --- a/src/server/BreCal/impl/shipcalls.py +++ b/src/server/BreCal/impl/shipcalls.py @@ -8,7 +8,7 @@ from .. import local_db from ..services.auth_guard import check_jwt from BreCal.database.update_database import evaluate_shipcall_state -from BreCal.database.sql_queries import create_sql_query_shipcall_get +from BreCal.database.sql_queries import create_sql_query_shipcall_get, create_sql_query_shipcall_post, create_sql_query_shipcall_put def GetShipcalls(options): """ @@ -63,6 +63,7 @@ def PostShipcalls(schemaModel): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = create_sql_query_shipcall_post(schemaModel) query = "INSERT INTO shipcall (" isNotFirst = False for key in schemaModel.keys(): @@ -145,6 +146,11 @@ def PostShipcalls(schemaModel): commands.execute(query, {"scid" : new_id, "pid" : user_data["participant_id"], "uid" : user_data["id"]}) return json.dumps({"id" : new_id}), 201, {'Content-Type': 'application/json; charset=utf-8'} + + except ValidationError as ex: + logging.error(ex) + print(ex) + return json.dumps(f"bad format. \nError Messages: {ex.messages}. \nValid Data: {ex.valid_data}"), 400 except Exception as ex: logging.error(traceback.format_exc()) @@ -179,6 +185,7 @@ def PutShipcalls(schemaModel): pooledConnection.close() return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} + # query = create_sql_query_shipcall_put(schemaModel) query = "UPDATE shipcall SET " isNotFirst = False for key in schemaModel.keys(): @@ -258,6 +265,11 @@ def PutShipcalls(schemaModel): commands.execute(query, {"scid" : schemaModel["id"], "pid" : user_data["participant_id"], "uid" : user_data["id"]}) return json.dumps({"id" : schemaModel["id"]}), 200 + + except ValidationError as ex: + logging.error(ex) + print(ex) + return json.dumps(f"bad format. \nError Messages: {ex.messages}. \nValid Data: {ex.valid_data}"), 400 except Exception as ex: logging.error(traceback.format_exc()) diff --git a/src/server/BreCal/schemas/model.py b/src/server/BreCal/schemas/model.py index acf2769..9158aed 100644 --- a/src/server/BreCal/schemas/model.py +++ b/src/server/BreCal/schemas/model.py @@ -187,7 +187,7 @@ class ShipcallSchema(Schema): super().__init__(unknown=None) pass - id = fields.Integer() + id = fields.Integer(metadata={'required':True}) ship_id = fields.Integer(metadata={'required':True}) #type = fields.Enum(ShipcallType, default=ShipcallType.undefined) # type = fields.Integer() # make enum: shipcall type. add validator type = fields.Integer(metadata={'required':True}) # make enum: shipcall type. add validator # type = fields.Enum(ShipcallType, default=ShipcallType.undefined) # type = fields.Integer() # make enum: shipcall type. add validator diff --git a/src/server/BreCal/validators/input_validation.py b/src/server/BreCal/validators/input_validation.py index 048d594..6cddf47 100644 --- a/src/server/BreCal/validators/input_validation.py +++ b/src/server/BreCal/validators/input_validation.py @@ -14,116 +14,10 @@ from BreCal.impl.berths import GetBerths from BreCal.database.enums import ParticipantType -def check_if_string_has_special_characters(text:str): - """ - check, whether there are any characters within the provided string, which are not found in the ascii letters or digits - ascii_letters: abcd (...) and ABCD (...) - digits: 0123 (...) - Source: https://stackoverflow.com/questions/57062794/is-there-a-way-to-check-if-a-string-contains-special-characters - User: https://stackoverflow.com/users/10035985/andrej-kesely - returns bool - """ - return bool(set(text).difference(ascii_letters + digits)) +from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, get_participant_id_dictionary, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, get_berth_id_dictionary, check_if_string_has_special_characters, get_ship_id_dictionary, check_if_int_is_valid_flag -def check_if_int_is_valid_flag(value, enum_object): - # e.g., when an IntFlag has the values 1,2,4; the maximum valid value is 7 - max_int = sum([int(val) for val in list(enum_object._value2member_map_.values())]) - return 0 < value <= max_int -def get_participant_id_dictionary(): - # get all participants - response,status_code,header = GetParticipant(options={}) - - # build a dictionary of id:item pairs, so one can select the respective participant - participants = json.loads(response) - participants = {items.get("id"):items for items in participants} - return participants - -def get_ship_id_dictionary(): - # get all ships - response,status_code,header = GetShips(token=None) - - # build a dictionary of id:item pairs, so one can select the respective participant - ships = json.loads(response) - ships = {items.get("id"):items for items in ships} - return ships - -def get_berth_id_dictionary(): - # get all berths - response,status_code,header = GetBerths(token=None) - - # build a dictionary of id:item pairs, so one can select the respective participant - berths = json.loads(response) - berths = {items.get("id"):items for items in berths} - return berths - -def check_if_user_is_bsmd_type(user_data:dict)->bool: - """ - given a dictionary of user data, determine the respective participant id and read, whether - that participant is a .BSMD-type - - Note: ParticipantType is an IntFlag. - Hence, ParticipantType(1) is ParticipantType.BSMD, - and ParticipantType(7) is [ParticipantType.BSMD, ParticipantType.TERMINAL, ParticipantType.PILOT] - - both would return 'True' - - returns: boolean. Whether the participant id is a .BSMD type element - """ - # user_data = decode token - participant_id = user_data.get("participant_id") - - # build a dictionary of id:item pairs, so one can select the respective participant - participants = get_participant_id_dictionary() - participant = participants.get(participant_id,{}) - - # boolean check: is the participant of type .BSMD? - is_bsmd = ParticipantType.BSMD in ParticipantType(participant.get("type",0)) - return is_bsmd - -def check_if_ship_id_is_valid(ship_id): - """check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once""" - if ship_id is None: - return True - - # build a dictionary of id:item pairs, so one can select the respective participant - ships = get_ship_id_dictionary() - - # boolean check - ship_id_is_valid = ship_id in list(ships.keys()) - return ship_id_is_valid - -def check_if_berth_id_is_valid(berth_id): - """check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once""" - if berth_id is None: - return True - - # build a dictionary of id:item pairs, so one can select the respective participant - berths = get_berth_id_dictionary() - - # boolean check - berth_id_is_valid = berth_id in list(berths.keys()) - return berth_id_is_valid - -def check_if_participant_id_is_valid(participant_id): - """check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once""" - if participant_id is None: - return True - - # build a dictionary of id:item pairs, so one can select the respective participant - participants = get_participant_id_dictionary() - - # boolean check - participant_id_is_valid = participant_id in list(participants.keys()) - return participant_id_is_valid - -def check_if_participant_ids_are_valid(participant_ids): - # check each participant id individually - valid_participant_ids = [check_if_participant_id_is_valid(participant_id) for participant_id in participant_ids] - - # boolean check, whether all participant ids are valid - return all(valid_participant_ids) def validate_posted_shipcall_data(user_data:dict, loadedModel:dict, content:dict): @@ -214,6 +108,8 @@ def validate_posted_shipcall_data(user_data:dict, loadedModel:dict, content:dict if tidal_window_from is not None: if not tidal_window_from >= time_now: raise ValidationError(f"'tidal_window_from' must be in the future. Incorrect datetime provided.") + + # #TODO: assert tidal_window_from > tidal_window_to # #TODO: len of participants > 0, if agency # * assigned participant for agency diff --git a/src/server/BreCal/validators/input_validation_shipcall.py b/src/server/BreCal/validators/input_validation_shipcall.py index e69de29..d1ded52 100644 --- a/src/server/BreCal/validators/input_validation_shipcall.py +++ b/src/server/BreCal/validators/input_validation_shipcall.py @@ -0,0 +1,330 @@ +import json +import datetime +from abc import ABC, abstractmethod +from marshmallow import ValidationError +from string import ascii_letters, digits + +from BreCal.schemas.model import Ship, Shipcall, Berth, User, Participant, ShipcallType +from BreCal.impl.participant import GetParticipant +from BreCal.impl.ships import GetShips +from BreCal.impl.berths import GetBerths + +from BreCal.database.enums import ParticipantType, ParticipantFlag +from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_string_has_special_characters, get_shipcall_id_dictionary, get_participant_type_from_user_data, check_if_int_is_valid_flag + + +class InputValidationShipcall(): + """ + This class combines a complex set of individual input validation functions into a joint object. + It uses static methods, so the object does not need to be instantiated, but functions can be called immediately. + + Example: + InputValidationShipcall.evaluate(user_data, loadedModel, content) + + When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues. + + """ + def __init__(self) -> None: + pass + + @staticmethod + def evaluate_post_data(user_data:dict, loadedModel:dict, content:dict): + """ + this function combines multiple validation functions to verify data, which is sent to the API as a shipcall's POST-request + + checks: + 1. permission: only participants that belong to the BSMD group are allowed to POST shipcalls + 2. reference checks: all refered objects within the Shipcall must exist + 3. reasonable values: validates the values within the Shipcall + 4. existance of required fields + """ + # check for permission (only BSMD-type participants) + InputValidationShipcall.check_user_is_bsmd_type(user_data) + + # check references (referred IDs must exist) + InputValidationShipcall.check_referenced_ids(loadedModel) + + # check for reasonable values in the shipcall fields + InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["canceled", "evaluation", "evaluation_message"]) + + # POST-request only: check the existance of required fields based on the ShipcallType + InputValidationShipcall.check_required_fields_exist_based_on_type(loadedModel, content) + + # POST-request only: check the existance of a participant list, when the user is of type agency + InputValidationShipcall.check_participant_list_not_empty_when_user_is_agency(user_data, loadedModel) + return + + @staticmethod + def evaluate_put_data(user_data:dict, loadedModel:dict, content:dict): + """ + this function combines multiple validation functions to verify data, which is sent to the API as a shipcall's PUT-request + + checks: + 1. whether the user belongs to participant group type BSMD + 2. users of the agency may edit the shipcall, when the shipcall-participant-map entry lists them + 3. all value-rules of the POST evaluation + 4. a canceled shipcall may not be changed + 5. existance of required fields + """ + # check for permission (only BSMD-type participants) + InputValidationShipcall.check_user_is_bsmd_type(user_data) + + # check, whether an agency is listed in the shipcall-participant-map + # InputValidationShipcall.check_agency_in_shipcall_participant_map() # args? + + # check for reasonable values in the shipcall fields and checks for forbidden keys. Note: 'canceled' is allowed in PUT-requests. + InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["evaluation", "evaluation_message"]) + + # a canceled shipcall cannot be selected + InputValidationShipcall.check_shipcall_is_canceled(loadedModel, content) + + # the ID field is required, all missing fields will be ignored in the update + InputValidationShipcall.check_required_fields_of_put_request(content) + return + + @staticmethod + def check_shipcall_values(loadedModel:dict, content:dict, forbidden_keys:list=["canceled", "evaluation", "evaluation_message"]): + """ + individually checks each value provided in the loadedModel/content. + This function validates, whether the values are reasonable. + + Also, some data may not be set in a POST-request. + """ + # Note: BreCal.schemas.model.ShipcallSchema has an internal validation, which the marshmallow library provides. This is used + # to verify values individually, when the schema is loaded with data. + # This function focuses on more complex input validation, which may require more sophisticated methods + + # loadedModel fills missing values, sometimes using optional values. Hence, the 'content'-variable is prefered for some of these verifications + # voyage shall not contain special characters + voyage_str_is_invalid = check_if_string_has_special_characters(text=content.get("voyage","")) + if voyage_str_is_invalid: + raise ValidationError(f"there are invalid characters in the 'voyage'-string. Please use only digits and ASCII letters. Allowed: {ascii_letters+digits}. Found: {content.get('voyage')}") + + # the 'flags' integer must be valid + flags_value = content.get("flags", 0) + if check_if_int_is_valid_flag(flags_value, enum_object=ParticipantFlag): + raise ValidationError(f"incorrect value provided for 'flags'. Must be a valid combination of the flags.") + + # time values must use future-dates + InputValidationShipcall.check_times_are_in_future(loadedModel, content) + + # some arguments must not be provided + InputValidationShipcall.check_forbidden_arguments(content, forbidden_keys=forbidden_keys) + return + + @staticmethod + def check_agency_in_shipcall_participant_map(): # args? + return + + @staticmethod + def check_user_is_bsmd_type(user_data): + """ + check, whether the user belongs to a participant, which is of type ParticipantType.BSMD + as ParticipantType is an IntFlag, a user belonging to multiple groups is properly evaluated. + """ + is_bsmd = check_if_user_is_bsmd_type(user_data) + if not is_bsmd: + raise ValidationError(f"current user does not belong to BSMD. Cannot post shipcalls. Found user data: {user_data}") + return + + @staticmethod + def check_referenced_ids(loadedModel): + """ + check, whether the referenced entries exist (e.g., when a Ship ID is referenced, but does not exist, the validation fails) + """ + # get all IDs from the loadedModel + ship_id = loadedModel.get("ship_id", None) + arrival_berth_id = loadedModel.get("arrival_berth_id", None) + departure_berth_id = loadedModel.get("departure_berth_id", None) + participant_ids = loadedModel.get("participants",[]) + + valid_ship_id = check_if_ship_id_is_valid(ship_id=ship_id) + if not valid_ship_id: + raise ValidationError(f"provided an invalid ship id, which is not found in the database: {ship_id}") + + valid_arrival_berth_id = check_if_berth_id_is_valid(berth_id=arrival_berth_id) + if not valid_arrival_berth_id: + raise ValidationError(f"provided an invalid arrival berth id, which is not found in the database: {arrival_berth_id}") + + valid_departure_berth_id = check_if_berth_id_is_valid(berth_id=departure_berth_id) + if not valid_departure_berth_id: + raise ValidationError(f"provided an invalid departure berth id, which is not found in the database: {departure_berth_id}") + + valid_participant_ids = check_if_participant_ids_are_valid(participant_ids=participant_ids) + if not valid_participant_ids: + raise ValidationError(f"one of the provided participant ids is invalid. Could not find one of these in the database: {participant_ids}") + return + + @staticmethod + def check_forbidden_arguments(content:dict, forbidden_keys=["canceled", "evaluation", "evaluation_message"]): + """ + a post-request must not contain the arguments 'canceled', 'evaluation', 'evaluation_message'. + a put-request must not contain the arguments 'evaluation', 'evaluation_message' + + """ + # the following keys should not be set in a POST-request. + for forbidden_key in forbidden_keys: + value = content.get(forbidden_key, None) + if value is not None: + raise ValidationError(f"'{forbidden_key}' may not be set on POST. Found: {value}") + return + + @staticmethod + def check_required_fields_exist_based_on_type(loadedModel:dict, content:dict): + """ + depending on the ShipcallType, some fields are *required* in a POST-request + """ + type_ = loadedModel.get("type", int(ShipcallType.undefined)) + eta = content.get("eta", None) + etd = content.get("etd", None) + arrival_berth_id = content.get("arrival_berth_id", None) + departure_berth_id = content.get("departure_berth_id", None) + + if int(type_)==int(ShipcallType.undefined): + raise ValidationError(f"providing 'type' is mandatory. Missing key!") + + # arrival: arrival_berth_id & eta must exist + elif int(type_)==int(ShipcallType.arrival): + if eta is None: + raise ValidationError(f"providing 'eta' is mandatory. Missing key!") + + if arrival_berth_id is None: + raise ValidationError(f"providing 'arrival_berth_id' is mandatory. Missing key!") + + # departure: departive_berth_id and etd must exist + elif int(type_)==int(ShipcallType.departure): + if etd is None: + raise ValidationError(f"providing 'etd' is mandatory. Missing key!") + + if departure_berth_id is None: + raise ValidationError(f"providing 'departure_berth_id' is mandatory. Missing key!") + + # shifting: arrival_berth_id, departure_berth_id, eta and etd must exist + elif int(type_)==int(ShipcallType.shifting): + if (eta is None) or (etd is None): + raise ValidationError(f"providing 'eta' and 'etd' is mandatory. Missing one of those keys!") + if (arrival_berth_id is None) or (departure_berth_id is None): + raise ValidationError(f"providing 'arrival_berth_id' & 'departure_berth_id' is mandatory. Missing key!") + + else: + raise ValidationError(f"incorrect 'type' provided!") + return + + @staticmethod + def check_times_are_in_future(loadedModel:dict, content:dict): + """ + Dates should be in the future. Depending on the ShipcallType, specific values should be checked + Perfornms datetime checks in the loadedModel (datetime.datetime objects). + """ + # obtain the current datetime to check, whether the provided values are in the future + time_now = datetime.datetime.now() + + type_ = loadedModel.get("type", int(ShipcallType.undefined)) + eta = loadedModel.get("eta") + etd = loadedModel.get("etd") + tidal_window_from = loadedModel.get("tidal_window_from", None) + tidal_window_to = loadedModel.get("tidal_window_to", None) + + # Estimated arrival or departure times + InputValidationShipcall.check_times_in_future_based_on_type(type_, time_now, eta, etd) + + # Tidal Window + InputValidationShipcall.check_tidal_window_in_future(time_now, tidal_window_from, tidal_window_to) + return + + @staticmethod + def check_times_in_future_based_on_type(type_, time_now, eta, etd): + """ + checks, whether the ETA & ETD times are in the future. + based on the type, this function checks: + arrival: eta + departure: etd + shifting: eta & etd + """ + if int(type_)==int(ShipcallType.undefined): + raise ValidationError(f"providing 'type' is mandatory. Missing key!") + elif int(type_)==int(ShipcallType.arrival): + if not eta >= time_now: + raise ValidationError(f"'eta' must be in the future. Incorrect datetime provided. Current Time: {time_now}. ETA: {eta}.") + elif int(type_)==int(ShipcallType.departure): + if not etd >= time_now: + raise ValidationError(f"'etd' must be in the future. Incorrect datetime provided. Current Time: {time_now}. ETD: {etd}.") + elif int(type_)==int(ShipcallType.shifting): + if (not eta >= time_now) or (not etd >= time_now): + raise ValidationError(f"'eta' and 'etd' must be in the future. Incorrect datetime provided. Current Time: {time_now}. ETA: {eta}. ETD: {etd}") + if (not eta >= etd): + raise ValidationError(f"'etd' must be larger than 'eta'. The ship cannot depart, before it has arrived. Found: ETA {eta}, ETA: {etd}") + return + + @staticmethod + def check_tidal_window_in_future(time_now, tidal_window_from, tidal_window_to): + if tidal_window_to is not None: + if not tidal_window_to >= time_now: + raise ValidationError(f"'tidal_window_to' must be in the future. Incorrect datetime provided.") + + if tidal_window_from is not None: + if not tidal_window_from >= time_now: + raise ValidationError(f"'tidal_window_from' must be in the future. Incorrect datetime provided.") + + if (tidal_window_to is not None) and (tidal_window_from is not None): + if tidal_window_to < tidal_window_from: + raise ValidationError(f"'tidal_window_to' must take place after 'tidal_window_from'. Incorrect datetime provided. Found 'tidal_window_to': {tidal_window_to}, 'tidal_window_from': {tidal_window_to}.") + return + + @staticmethod + def check_participant_list_not_empty_when_user_is_agency(user_data, loadedModel): + """ + participant types use an IntFlag to assign multiple roles to a user. When a user is assigned to the + AGENCY role, the user must provide a non-empty list of participants in POST-requests. + """ + participant_type = get_participant_type_from_user_data(user_data) + + if int(ParticipantType.AGENCY) in int(participant_type): + participants = loadedModel.get("participants",[]) + + if len(participants)==0: + raise ValidationError(f"A user of type 'ParticipantType.AGENCY' is required to provide a list of valid participants.") + return + + @staticmethod + def check_shipcall_is_canceled(loadedModel, content): + # read the shipcall_id from the PUT data + shipcall_id = loadedModel.get("id") + + # get all shipcalls in the database + shipcalls = get_shipcall_id_dictionary() + + # search for the matching shipcall in the database + shipcall = shipcalls.get(shipcall_id,{}) + + # if the *existing* shipcall in the database is canceled, it may not be changed + if shipcall.get("canceled", False): + raise ValidationError(f"The shipcall with id 'shipcall_id' is canceled. A canceled shipcall may not be changed.") + return + + @staticmethod + def check_required_fields_of_put_request(content:dict): + shipcall_id = content.get("id", None) + if shipcall_id is None: + raise ValidationError(f"A PUT request requires an 'id' to refer to.") + + + +""" +# copy +def validate_posted_shipcall_data(user_data:dict, loadedModel:dict, content:dict): + ##### Section 1: check user_data ##### + # DONE: refactored + + ##### Section 2: check loadedModel ##### + # DONE: refactored + + ##### Section 3: check content ##### + # DONE: refactored + + + ##### Section 4: check loadedModel & content ##### + # DONE: refactored ET and BERTH ID existance check + # DONE: refactored 'time in future' checks + return +""" \ No newline at end of file diff --git a/src/server/BreCal/validators/input_validation_utils.py b/src/server/BreCal/validators/input_validation_utils.py new file mode 100644 index 0000000..dd38a4a --- /dev/null +++ b/src/server/BreCal/validators/input_validation_utils.py @@ -0,0 +1,146 @@ +import json +from string import ascii_letters, digits + +from BreCal.impl.participant import GetParticipant +from BreCal.impl.ships import GetShips +from BreCal.impl.berths import GetBerths +from BreCal.impl.shipcalls import GetShipcalls + +from BreCal.database.enums import ParticipantType + +def get_participant_id_dictionary(): + """ + get a dictionary of all participants, where the key is the participant's id, and the value is a dictionary + of common participant data (not a data model). + """ + # get all participants + response,status_code,header = GetParticipant(options={}) + + # build a dictionary of id:item pairs, so one can select the respective participant + participants = json.loads(response) + participants = {items.get("id"):items for items in participants} + return participants + +def get_berth_id_dictionary(): + # get all berths + response,status_code,header = GetBerths(token=None) + + # build a dictionary of id:item pairs, so one can select the respective participant + berths = json.loads(response) + berths = {items.get("id"):items for items in berths} + return berths + +def get_ship_id_dictionary(): + # get all ships + response,status_code,header = GetShips(token=None) + + # build a dictionary of id:item pairs, so one can select the respective participant + ships = json.loads(response) + ships = {items.get("id"):items for items in ships} + return ships + +def get_shipcall_id_dictionary(): + # get all ships + response,status_code,header = GetShipcalls(token=None) + + # build a dictionary of id:item pairs, so one can select the respective participant + shipcalls = json.loads(response) + shipcalls = {items.get("id"):items for items in shipcalls} + return shipcalls + + +def get_participant_type_from_user_data(user_data:dict)->ParticipantType: + # user_data = decode token + participant_id = user_data.get("participant_id") + + # build a dictionary of id:item pairs, so one can select the respective participant + participants = get_participant_id_dictionary() + participant = participants.get(participant_id,{}) + participant_type = ParticipantType(participant.get("type",0)) + return participant_type + +def check_if_user_is_bsmd_type(user_data:dict)->bool: + """ + given a dictionary of user data, determine the respective participant id and read, whether + that participant is a .BSMD-type + + Note: ParticipantType is an IntFlag. + Hence, ParticipantType(1) is ParticipantType.BSMD, + and ParticipantType(7) is [ParticipantType.BSMD, ParticipantType.TERMINAL, ParticipantType.PILOT] + + both would return 'True' + + returns: boolean. Whether the participant id is a .BSMD type element + """ + # use the decoded JWT token and extract the participant type + participant_type = get_participant_type_from_user_data(user_data) + + # boolean check: is the participant of type .BSMD? + is_bsmd = ParticipantType.BSMD in participant_type + return is_bsmd + + +def check_if_ship_id_is_valid(ship_id): + """check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once""" + if ship_id is None: + return True + + # build a dictionary of id:item pairs, so one can select the respective participant + ships = get_ship_id_dictionary() + + # boolean check + ship_id_is_valid = ship_id in list(ships.keys()) + return ship_id_is_valid + +def check_if_berth_id_is_valid(berth_id): + """check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once""" + if berth_id is None: + return True + + # build a dictionary of id:item pairs, so one can select the respective participant + berths = get_berth_id_dictionary() + + # boolean check + berth_id_is_valid = berth_id in list(berths.keys()) + return berth_id_is_valid + +def check_if_participant_id_is_valid(participant_id): + """check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once""" + # #TODO1: Daniel Schick: 'types may only appear once and must not include type "BSMD"' + + if participant_id is None: + return True + + # build a dictionary of id:item pairs, so one can select the respective participant + participants = get_participant_id_dictionary() + + # boolean check + participant_id_is_valid = participant_id in list(participants.keys()) + return participant_id_is_valid + +def check_if_participant_ids_are_valid(participant_ids): + # check each participant id individually + valid_participant_ids = [check_if_participant_id_is_valid(participant_id) for participant_id in participant_ids] + + # boolean check, whether all participant ids are valid + return all(valid_participant_ids) + + +def check_if_string_has_special_characters(text:str): + """ + check, whether there are any characters within the provided string, which are not found in the ascii letters or digits + ascii_letters: abcd (...) and ABCD (...) + digits: 0123 (...) + + Source: https://stackoverflow.com/questions/57062794/is-there-a-way-to-check-if-a-string-contains-special-characters + User: https://stackoverflow.com/users/10035985/andrej-kesely + returns bool + """ + return bool(set(text).difference(ascii_letters + digits)) + +def check_if_int_is_valid_flag(value, enum_object): + # e.g., when an IntFlag has the values 1,2,4; the maximum valid value is 7 + max_int = sum([int(val) for val in list(enum_object._value2member_map_.values())]) + return 0 < value <= max_int + +