From cd8c246d72e0d2edb2babfbe957c1917628b41cc Mon Sep 17 00:00:00 2001 From: Max Metz Date: Mon, 10 Jun 2024 14:45:36 +0200 Subject: [PATCH] adapting times POST, PUT, DELETE to properly include the special case, where a BSMD user should also be allowed to handle times entries of an AGENCY, if the agency has the flag set. --- src/server/BreCal/database/sql_queries.py | 10 ++ .../validators/input_validation_times.py | 139 +++++++++++++----- .../validators/input_validation_utils.py | 13 +- 3 files changed, 121 insertions(+), 41 deletions(-) diff --git a/src/server/BreCal/database/sql_queries.py b/src/server/BreCal/database/sql_queries.py index 51a3dea..9888ada 100644 --- a/src/server/BreCal/database/sql_queries.py +++ b/src/server/BreCal/database/sql_queries.py @@ -298,6 +298,11 @@ class SQLQuery(): def get_participants()->str: query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?" return query + + @staticmethod + def get_participant_from_id()->str: + query = "SELECT id, type, flags FROM participant WHERE id=?participant_id?" + return query @staticmethod def get_shipcall_post(schemaModel:dict)->str: @@ -361,6 +366,11 @@ class SQLQuery(): def get_shipcall_participant_map_by_shipcall_id()->str: query = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?" return query + + @staticmethod + def get_shipcall_participant_map_by_shipcall_id_and_type()->str: + query = "SELECT id, participant_id FROM shipcall_participant_map where (shipcall_id = ?id? AND type=?type?)" + return query @staticmethod def get_shipcall_participant_map_delete_by_id()->str: diff --git a/src/server/BreCal/validators/input_validation_times.py b/src/server/BreCal/validators/input_validation_times.py index 6b8ab6a..5ad3c67 100644 --- a/src/server/BreCal/validators/input_validation_times.py +++ b/src/server/BreCal/validators/input_validation_times.py @@ -13,6 +13,7 @@ from BreCal.impl.times import GetTimes from BreCal.database.enums import ParticipantType, ParticipantFlag from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone +from BreCal.database.sql_queries import SQLQuery from BreCal.database.sql_handler import execute_sql_query_standalone from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters import werkzeug @@ -281,44 +282,50 @@ class InputValidationTimes(): This method does not validate, what the POST-request contains, but it validates, whether the *user* is authorized to send the request. + This method also checks for a special case: when an assigned AGENCY participant has the .BSMD flag enabled, + a user of type BSMD may also post the times dataset. + options: spm_shipcall_data: data from the ShipcallParticipantMap, which refers to the respective shipcall ID. The SPM can be an optional argument to allow for much easier unit testing. """ - + ### TIMES DATASET (ShipcallParticipantMap) ### # identify shipcall_id shipcall_id = loadedModel["shipcall_id"] - - # identify user's participant_id & type (get all participants; then filter these for the {participant_id}) - participant_id = user_data["participant_id"] #participants = get_participant_id_dictionary() #participant_type = ParticipantType(participants.get(participant_id,{}).get("type")) - participant_type = ParticipantType(loadedModel["participant_type"]) if not isinstance(loadedModel["participant_type"],ParticipantType) else loadedModel["participant_type"] + DATASET_participant_type = ParticipantType(loadedModel["participant_type"]) if not isinstance(loadedModel["participant_type"],ParticipantType) else loadedModel["participant_type"] # get ShipcallParticipantMap for the shipcall_id if spm_shipcall_data is None: # read the ShipcallParticipantMap entry of the current shipcall_id. This is used within the input validation of a PUT request # creates a list of {'participant_id: ..., 'type': ...} elements spm_shipcall_data = execute_sql_query_standalone( - query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?", - param={"shipcall_id":shipcall_id}, + query = "SELECT participant_id, type FROM shipcall_participant_map WHERE (shipcall_id=?shipcall_id? AND type=?type?)", + param={"shipcall_id":shipcall_id, "type":int(DATASET_participant_type)}, pooledConnection=None ) + + DATASET_participant_id = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type=DATASET_participant_type, spm_shipcall_data=spm_shipcall_data) + + ### USER DATA (token) ### + # identify user's participant_id & type (get all participants; then filter these for the {participant_id}) + user_participant_id = user_data["participant_id"] #participants = get_participant_id_dictionary() #participant_type = ParticipantType(participants.get(participant_id,{}).get("type")) + if (ParticipantType.AGENCY in DATASET_participant_type): + special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, DATASET_participant_id, DATASET_participant_type) + if (special_case__bsmd_may_edit_agency_dataset): + # when a BSMD user posts a dataset of an AGENCY with BSMD-flag, there is no violation + return + # check, if participant_id is assigned to the ShipcallParticipantMap matching_spm = [ spm for spm in spm_shipcall_data - if spm.get("participant_id")==participant_id + if spm.get("participant_id")==user_participant_id ] if not len(matching_spm)>0: - raise ValidationError(f'The participant group with id {participant_id} is not assigned to the shipcall. Found ShipcallParticipantMap: {spm_shipcall_data}') - - # check, if the assigned participant_id is assigned with the same role - matching_spm_element = matching_spm[0] - matching_spm_element_participant_type = ParticipantType(matching_spm_element.get("type")) - if not matching_spm_element_participant_type in participant_type: - raise ValidationError(f'The participant group with id {participant_id} is assigned to the shipcall in a different role. Request Role: {participant_type}, ShipcallParticipantMap Role Assignment: {matching_spm_element_participant_type}') + raise ValidationError(f'The participant group with id {user_participant_id} is not assigned to the shipcall. Found ShipcallParticipantMap: {spm_shipcall_data}') return @staticmethod @@ -352,6 +359,9 @@ class InputValidationTimes(): DELETE: times_id is used to directly identify the matching times entry + + A special exception takes place, when a participant of type AGENCY is involved. In those times-entries, users with the + IS_BSMD-Flag may also edit the entry. """ assert not ((loadedModel is None) and (times_id is None)), f"must provide either loadedModel OR times_id. Both are 'None'" assert (loadedModel is None) or (times_id is None), f"must provide either loadedModel OR times_id. Both are defined." @@ -359,28 +369,18 @@ class InputValidationTimes(): # identify the user's participant id user_participant_id = user_data["participant_id"] + # commonly used in the PUT-request if loadedModel is not None: shipcall_id = loadedModel["shipcall_id"] participant_type = loadedModel["participant_type"] - # get all times entries of the shipcall_id from the database as a list of {'participant_id':..., 'participant_type':...} elements - query = "SELECT participant_id, participant_type FROM times WHERE shipcall_id = ?shipcall_id?" - times = execute_sql_query_standalone(query=query, param={"shipcall_id":shipcall_id}, pooledConnection=None) - - # get the matching datasets, where the participant id is identical - time_datasets_of_participant_type = [time_ for time_ in times if time_.get("participant_type")==participant_type] - - # when there are no matching participants, raise a ValidationError - if not len(time_datasets_of_participant_type)>0: - raise ValidationError(f"Could not find a matching time dataset for the provided participant_type: {participant_type}. Found Time Datasets: {times}") - - # take the first match. There should always be only one match. - time_datasets_of_participant_type = time_datasets_of_participant_type[0] - participant_id_of_times_dataset = time_datasets_of_participant_type.get("participant_id") + # get the matching entry from the shipcall participant map. Raise an error, when there is no match. + participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type) + # commonly used in the DELETE-request if times_id is not None: # perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection. - query = "SELECT participant_id FROM times WHERE id = ?id?" + query = "SELECT participant_id, participant_type FROM times WHERE id = ?id?" pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None) # extracts the participant_id from the first matching entry, if applicable @@ -388,15 +388,80 @@ class InputValidationTimes(): # this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already raise ValidationError(f"Unknown times_id. Could not find a matching entry for ID: {times_id}") else: - participant_id_of_times_dataset = pdata[0].get("participant_id") + participant_type = pdata[0].get("participant_type") + shipcall_id = pdata[0].get("shipcall_id") + # get the matching entry from the shipcall participant map. Raise an error, when there is no match. + participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type) + + # when the user's participant id is different from the times dataset, an exception is raised if user_participant_id != participant_id_of_times_dataset: - raise ValidationError(f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}") + # for some AGENCY participants, users with the BSMD flag may also edit the datasets + special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, participant_id_of_times_dataset, participant_type) + if special_case__bsmd_may_edit_agency_dataset: + return + else: + raise ValidationError(f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}") return - - - - - + @staticmethod + def get_participant_id_from_shipcall_participant_map(shipcall_id:int, participant_type:int, spm_shipcall_data=None)->int: + """use shipcall_id and participant_type to identify the matching participant_id""" + if spm_shipcall_data is None: + spm_shipcall_data = execute_sql_query_standalone( + query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(), + param={"id":shipcall_id, "type":participant_type}, + command_type="query") # returns a list of matches + # raise an error when there are no matches + if len(spm_shipcall_data)==0: + raise ValidationError(f"Could not find a matching time dataset for the provided participant_type: {participant_type} at shipcall with id {shipcall_id}.") + + participant_id_of_times_dataset = spm_shipcall_data[0].get("participant_id") + return participant_id_of_times_dataset + + @staticmethod + def check_if_bsmd_may_edit_agency_dataset(user_participant_id:int, participant_id_of_times_dataset:int, participant_type:ParticipantType)->bool: + """ + This method determines, whether a BSMD user is allowed to edit an AGENCY dataset. + When the dataset does not refer to an agency, the method is not applicable (returns False). + + If it is applicable, + a) find out, whether the assigned participant (AGENCY) has the BSMD flag + b) find out, whether the user is of type BSMD + If both is true, return True + + args: + user_participant_id: ID of the user, obtained from the jwt-token + participant_id_of_times_dataset: assigned participant of the shipcall, obtained from the ShipcallParticipantMap + """ + # when the participant type of the dataset is not an AGENCY, this exception rule does not take place + dataset_participant_type_is_agency = int(participant_type)==int(ParticipantType.AGENCY) + if not dataset_participant_type_is_agency: + return False + + ### TIMES ENTRY (ShipcallParticipantMap) ### + # identify, whether the dataset's assigned participant has the BSMD flag + agency_has_bsmd_flag = InputValidationTimes.check_if_participant_has_bsmd_flag(participant_id=participant_id_of_times_dataset) + + ### USER DATA (token) ### + # determine, whether the user is of participant_type BSMD + user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id}) + return (agency_has_bsmd_flag) & (user_is_bsmd_type) + + @staticmethod + def check_if_participant_has_bsmd_flag(participant_id:int)->bool: + """ + Given a participant_id, this method checks, whether the participant with {participant_id} + has the .BSMD flag in the .flags field. + """ + # get the dataset's assigned Participant, which matches the SPM entry + participant = execute_sql_query_standalone( + SQLQuery.get_participant_from_id(), + param={"id":participant_id}, + command_type="single", + pooledConnection=None) + + has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))] + return has_bsmd_flag + diff --git a/src/server/BreCal/validators/input_validation_utils.py b/src/server/BreCal/validators/input_validation_utils.py index 325e3fb..19c6ac9 100644 --- a/src/server/BreCal/validators/input_validation_utils.py +++ b/src/server/BreCal/validators/input_validation_utils.py @@ -49,15 +49,20 @@ def get_shipcall_id_dictionary(): shipcalls = {items.get("id"):items for items in shipcalls} return shipcalls +def get_participant_type_from_participant_id(participant_id:int)->ParticipantType: + # build a dictionary of id:item pairs, so one can select the respective participant + participants = get_participant_id_dictionary() + participant = participants.get(participant_id,{}) + participant_type = ParticipantType(participant.get("type",0)) + return participant_type + def get_participant_type_from_user_data(user_data:dict)->ParticipantType: # user_data = decode token participant_id = user_data.get("participant_id") - # build a dictionary of id:item pairs, so one can select the respective participant - participants = get_participant_id_dictionary() - participant = participants.get(participant_id,{}) - participant_type = ParticipantType(participant.get("type",0)) + # builds an internal dictionary of id:item pairs, so one can select the respective participant + participant_type = get_participant_type_from_participant_id(participant_id) return participant_type def check_if_user_is_bsmd_type(user_data:dict)->bool: