From c0902c65ee7754185acc519375f8e340efbafddd Mon Sep 17 00:00:00 2001 From: scopesorting <143536577+scopesorting@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:19:08 +0200 Subject: [PATCH] =?UTF-8?q?regardless=20of=20the=20BSMD=20flag,=20BSMD=20u?= =?UTF-8?q?sers=20are=20now=20able=20to=20perform=20shipc=E2=80=A6=20(#51)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * regardless of the BSMD flag, BSMD users are now able to perform shipcall PUT-requests * regardless of the BSMD flag, BSMD users are now able to perform shipcall PUT-requests * docstrings and BSMD-flag handling --- src/server/BreCal/database/sql_handler.py | 30 +++++ .../validators/input_validation_shipcall.py | 44 +++---- .../validators/input_validation_times.py | 120 +++++++++++++----- 3 files changed, 142 insertions(+), 52 deletions(-) diff --git a/src/server/BreCal/database/sql_handler.py b/src/server/BreCal/database/sql_handler.py index 5440f20..2efbf18 100644 --- a/src/server/BreCal/database/sql_handler.py +++ b/src/server/BreCal/database/sql_handler.py @@ -6,6 +6,9 @@ import typing from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times, ShipcallParticipantMap from BreCal.database.enums import ParticipantType from BreCal.local_db import getPoolConnection +from BreCal.database.sql_queries import SQLQuery +from BreCal.schemas import model + def pandas_series_to_data_model(): return @@ -89,7 +92,14 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model) if schemas is sentinel: raise Exception("no such record") + elif command_type=="single_or_none": + sentinel = object() + # pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary. + # when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model. + schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model) + schemas = None if schemas is sentinel else schemas + elif command_type=="execute_scalar": schemas = commands.execute_scalar(query) @@ -100,6 +110,26 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N if rebuild_pooled_connection: pooledConnection.close() return schemas + +def get_assigned_participant_of_type(shipcall_id:int, participant_type:typing.Union[int,model.ParticipantType])->typing.Optional[model.Participant]: + """obtains the ShipcallParticipantMap of a given shipcall and finds the participant id of a desired type. Finally, returns the respective Participant""" + spm_shipcall_data = execute_sql_query_standalone( + query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(), + param={"id":shipcall_id, "type":participant_type}, + command_type="query") # returns a list of matches + + if len(spm_shipcall_data)==0: + return None + + query = 'SELECT * FROM participant WHERE id=?participant_id?' + assigned_participant = execute_sql_query_standalone( + query=query, + param={"participant_id":spm_shipcall_data[0]["participant_id"]}, + model=model.Participant, + command_type="single_or_none" + ) # returns a list of matches + return assigned_participant + class SQLHandler(): """ diff --git a/src/server/BreCal/validators/input_validation_shipcall.py b/src/server/BreCal/validators/input_validation_shipcall.py index c8fb23e..bcfb655 100644 --- a/src/server/BreCal/validators/input_validation_shipcall.py +++ b/src/server/BreCal/validators/input_validation_shipcall.py @@ -12,6 +12,7 @@ from BreCal.impl.berths import GetBerths from BreCal.database.enums import ParticipantType, ParticipantFlag from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data +from BreCal.database.sql_handler import get_assigned_participant_of_type from BreCal.database.sql_handler import execute_sql_query_standalone from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag from BreCal.validators.validation_base_utils import check_if_string_has_special_characters @@ -482,12 +483,12 @@ class InputValidationShipcall(): a) belong to the ASSIGNED agency participant group b) belong to a BSMD participant, if the assigned agency has enabled the bit flag - When there is not yet an assigned agency for the respective shipcall, the request fails, and the user is considered as not authorized. + When there is not yet an assigned agency for the respective shipcall, only BSMD users are authorized. This mechanism prevents self-assignment of an agency to arbitrary shipcalls. """ ### preparation ### # use the decoded JWT token and extract the participant type & participant id - participant_id = user_data.get("participant_id") + user_participant_id = user_data.get("participant_id") participant_type = get_participant_type_from_user_data(user_data) user_is_bsmd = (ParticipantType.BSMD in participant_type) @@ -497,42 +498,41 @@ class InputValidationShipcall(): ### AGENCY in SPM ### # determine, who is assigned as the agency for the shipcall if shipcall_participant_map is None: - query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)' - assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)}) + # query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)' + # assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)}) + assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) + an_agency_is_assigned = True if assigned_agency is not None else False + else: - assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)] - - an_agency_is_assigned = len(assigned_agency)==1 - if len(assigned_agency)>1: - raise ValidationError({"internal_error":f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}"}) - - if an_agency_is_assigned: # Agency assigned? User must belong to the assigned agency or be a BSMD user, in case the flag is set + assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)] + an_agency_is_assigned = len(assigned_agency)==1 + + if len(assigned_agency)>1: + raise ValidationError({"internal_error":f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}"}) assigned_agency = assigned_agency[0] - # determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls - query = 'SELECT * FROM participant where (id = ?participant_id?)' - agency_participant = execute_sql_query_standalone(query=query, param={"participant_id" : assigned_agency.participant_id}, command_type="single", model=Participant) + if an_agency_is_assigned: + assert isinstance(assigned_agency, Participant), f"expecting the assigency agency to be a Participant object. Found: {type(assigned_agency)}" + assert isinstance(assigned_agency.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(assigned_agency.flags)}" - assert isinstance(agency_participant.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(agency_participant.flags)}" - agency_has_bsmd_flag = agency_participant.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags) + # determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls + agency_has_bsmd_flag = assigned_agency.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags) ### USER authority ### # determine, whether the user is a) the assigned agency or b) a BSMD participant - user_is_assigned_agency = (participant_id == assigned_agency.participant_id) + user_is_assigned_agency = (user_participant_id == assigned_agency.participant_id) # when the BSMD flag is set: the user must be either BSMD or the assigned agency # when the BSMD flag is not set: the user must be the assigned agency - user_is_authorized = (user_is_bsmd or user_is_assigned_agency) if agency_has_bsmd_flag else user_is_assigned_agency + user_is_authorized = (user_is_bsmd or user_is_assigned_agency) #if agency_has_bsmd_flag else user_is_assigned_agency if not user_is_authorized: - raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {agency_participant.flags}") # Forbidden: 403 + raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {assigned_agency.flags}") # Forbidden: 403 else: # when there is no assigned agency, only BSMD users can update the shipcall - user_is_authorized = user_is_bsmd - - if not user_is_authorized: + if not user_is_bsmd: raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.") # part of a pytest.raises. Forbidden: 403 return diff --git a/src/server/BreCal/validators/input_validation_times.py b/src/server/BreCal/validators/input_validation_times.py index 10db2e9..2fd520b 100644 --- a/src/server/BreCal/validators/input_validation_times.py +++ b/src/server/BreCal/validators/input_validation_times.py @@ -15,6 +15,7 @@ from BreCal.database.enums import ParticipantType, ParticipantFlag from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone from BreCal.database.sql_queries import SQLQuery from BreCal.database.sql_handler import execute_sql_query_standalone +from BreCal.database.sql_handler import get_assigned_participant_of_type from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters import werkzeug @@ -379,45 +380,68 @@ class InputValidationTimes(): # identify the user's participant id user_participant_id = user_data["participant_id"] + """ # #TODO: + First of all, this method is shared for PUT and DELETE requests. + PUT) is based on the loadedModel + DELETE) is based on the times_id + Both of them share the {user_data}-argument + + These arguments are used to obtain shipcall_id, participant_type (of the times entry) and times_assigned_participant + + there should be the following authorization approaches + a) the user has the participant ID of the assigned entry for a given role + for this, we need: + 1) user_participant_id + 2) times_participant_type + 3) SPM: assigned participant of the respective type (times_assigned_participant) + _ = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.WHATTYPE) + + b) the user is the assigned agency (or the BSMD if allowed) + for this, we need: + 1) assigned_agency + assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) + 2) agency's flag + assigned_agency.flags + 3) user_is_bsmd boolean + """ + # commonly used in the PUT-request if loadedModel is not None: - shipcall_id = loadedModel["shipcall_id"] - participant_type = loadedModel["participant_type"] - - # get the matching entry from the shipcall participant map. Raise an error, when there is no match. - participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type) + (shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_put_request(loadedModel) # commonly used in the DELETE-request if times_id is not None: - if pdata is None: # regular behavior. pdata is only defined in unit tests. - # perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection. - query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?" - pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None) + # #TODO_refactor: + (shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_delete_request(times_id, pdata) - # extracts the participant_id from the first matching entry, if applicable - if not len(pdata)>0: - # this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already - raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"}) - else: - participant_type = pdata[0].get("participant_type") - shipcall_id = pdata[0].get("shipcall_id") + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) - # get the matching entry from the shipcall participant map. Raise an error, when there is no match. - participant_id_of_times_dataset = pdata[0].get("participant_id") - # participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type) - - # when the user's participant id is different from the times dataset, an exception is raised - if user_participant_id != participant_id_of_times_dataset: - # for some AGENCY participants, users with the BSMD flag may also edit the datasets - special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, participant_id_of_times_dataset, participant_type) - if special_case__bsmd_may_edit_agency_dataset: - return - else: - raise ValidationError({"user_participant_type":f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"}) - return + # a) the user has the participant ID of the assigned entry for a given role + user_is_assigned_role = user_participant_id == times_assigned_participant.id + # b) the user is the assigned agency + user_is_assigned_agency = user_participant_id == assigned_agency.id + + # c) the user is BSMD, if the assigned agency allows that + assigned_agency_has_bsmd_flag = assigned_agency.flags == 1 + user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id}) + user_is_bsmd_and_assigned_agency_has_flag = assigned_agency_has_bsmd_flag & user_is_bsmd_type + + if user_is_assigned_role: + return + + elif user_is_assigned_agency: + return + + elif user_is_bsmd_and_assigned_agency_has_flag: + return + + else: + raise ValidationError({"user_participant_type": f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. Alternatively, the assigned agency may edit and delete the dataset. As a special case, BSMD users may edit and delete times datasets, when the assigned agency allows that. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"}) + @staticmethod - def get_participant_id_from_shipcall_participant_map(shipcall_id:int, participant_type:int, spm_shipcall_data=None)->int: + def get_participant_id_from_shipcall_participant_map(shipcall_id:typing.Optional[int], participant_type:int, spm_shipcall_data=None)->int: """use shipcall_id and participant_type to identify the matching participant_id""" if shipcall_id is None: raise ValidationError({"shipcall_id":f"Could not find a referenced shipcall_id within the request."}) @@ -479,6 +503,42 @@ class InputValidationTimes(): has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))] return has_bsmd_flag + + @staticmethod + def prepare_authority_check_for_put_request(loadedModel)->typing.Tuple[int,Participant]: + """extracts the loadedModel to obtain relevant arguments""" + shipcall_id = loadedModel["shipcall_id"] + participant_type = loadedModel["participant_type"] + + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type) + + if times_assigned_participant is None: + raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."}) + return (shipcall_id, times_assigned_participant) + + @staticmethod + def prepare_authority_check_for_delete_request(times_id, pdata=None)->typing.Tuple[int,Participant]: + if pdata is None: # regular behavior. pdata is only defined in unit tests. + # perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection. + query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?" + pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None) + + # extracts the participant_id from the first matching entry, if applicable + if not len(pdata)>0: + # this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already + raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"}) + else: + participant_type = pdata[0].get("participant_type") + shipcall_id = pdata[0].get("shipcall_id") + + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type) + + if times_assigned_participant is None: + raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."}) + return (shipcall_id, times_assigned_participant) + def deprecated_build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]: