From 6eaec257a8e0da99c9c5f3121490410180bcb322 Mon Sep 17 00:00:00 2001 From: Max Metz Date: Mon, 14 Oct 2024 12:33:02 +0200 Subject: [PATCH] docstrings and BSMD-flag handling --- src/server/BreCal/database/sql_handler.py | 30 +++++ .../validators/input_validation_times.py | 120 +++++++++++++----- 2 files changed, 120 insertions(+), 30 deletions(-) diff --git a/src/server/BreCal/database/sql_handler.py b/src/server/BreCal/database/sql_handler.py index 5440f20..2efbf18 100644 --- a/src/server/BreCal/database/sql_handler.py +++ b/src/server/BreCal/database/sql_handler.py @@ -6,6 +6,9 @@ import typing from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times, ShipcallParticipantMap from BreCal.database.enums import ParticipantType from BreCal.local_db import getPoolConnection +from BreCal.database.sql_queries import SQLQuery +from BreCal.schemas import model + def pandas_series_to_data_model(): return @@ -89,7 +92,14 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model) if schemas is sentinel: raise Exception("no such record") + elif command_type=="single_or_none": + sentinel = object() + # pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary. + # when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model. + schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model) + schemas = None if schemas is sentinel else schemas + elif command_type=="execute_scalar": schemas = commands.execute_scalar(query) @@ -100,6 +110,26 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N if rebuild_pooled_connection: pooledConnection.close() return schemas + +def get_assigned_participant_of_type(shipcall_id:int, participant_type:typing.Union[int,model.ParticipantType])->typing.Optional[model.Participant]: + """obtains the ShipcallParticipantMap of a given shipcall and finds the participant id of a desired type. Finally, returns the respective Participant""" + spm_shipcall_data = execute_sql_query_standalone( + query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(), + param={"id":shipcall_id, "type":participant_type}, + command_type="query") # returns a list of matches + + if len(spm_shipcall_data)==0: + return None + + query = 'SELECT * FROM participant WHERE id=?participant_id?' + assigned_participant = execute_sql_query_standalone( + query=query, + param={"participant_id":spm_shipcall_data[0]["participant_id"]}, + model=model.Participant, + command_type="single_or_none" + ) # returns a list of matches + return assigned_participant + class SQLHandler(): """ diff --git a/src/server/BreCal/validators/input_validation_times.py b/src/server/BreCal/validators/input_validation_times.py index 10db2e9..2fd520b 100644 --- a/src/server/BreCal/validators/input_validation_times.py +++ b/src/server/BreCal/validators/input_validation_times.py @@ -15,6 +15,7 @@ from BreCal.database.enums import ParticipantType, ParticipantFlag from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone from BreCal.database.sql_queries import SQLQuery from BreCal.database.sql_handler import execute_sql_query_standalone +from BreCal.database.sql_handler import get_assigned_participant_of_type from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters import werkzeug @@ -379,45 +380,68 @@ class InputValidationTimes(): # identify the user's participant id user_participant_id = user_data["participant_id"] + """ # #TODO: + First of all, this method is shared for PUT and DELETE requests. + PUT) is based on the loadedModel + DELETE) is based on the times_id + Both of them share the {user_data}-argument + + These arguments are used to obtain shipcall_id, participant_type (of the times entry) and times_assigned_participant + + there should be the following authorization approaches + a) the user has the participant ID of the assigned entry for a given role + for this, we need: + 1) user_participant_id + 2) times_participant_type + 3) SPM: assigned participant of the respective type (times_assigned_participant) + _ = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.WHATTYPE) + + b) the user is the assigned agency (or the BSMD if allowed) + for this, we need: + 1) assigned_agency + assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) + 2) agency's flag + assigned_agency.flags + 3) user_is_bsmd boolean + """ + # commonly used in the PUT-request if loadedModel is not None: - shipcall_id = loadedModel["shipcall_id"] - participant_type = loadedModel["participant_type"] - - # get the matching entry from the shipcall participant map. Raise an error, when there is no match. - participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type) + (shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_put_request(loadedModel) # commonly used in the DELETE-request if times_id is not None: - if pdata is None: # regular behavior. pdata is only defined in unit tests. - # perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection. - query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?" - pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None) + # #TODO_refactor: + (shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_delete_request(times_id, pdata) - # extracts the participant_id from the first matching entry, if applicable - if not len(pdata)>0: - # this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already - raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"}) - else: - participant_type = pdata[0].get("participant_type") - shipcall_id = pdata[0].get("shipcall_id") + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) - # get the matching entry from the shipcall participant map. Raise an error, when there is no match. - participant_id_of_times_dataset = pdata[0].get("participant_id") - # participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type) - - # when the user's participant id is different from the times dataset, an exception is raised - if user_participant_id != participant_id_of_times_dataset: - # for some AGENCY participants, users with the BSMD flag may also edit the datasets - special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, participant_id_of_times_dataset, participant_type) - if special_case__bsmd_may_edit_agency_dataset: - return - else: - raise ValidationError({"user_participant_type":f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"}) - return + # a) the user has the participant ID of the assigned entry for a given role + user_is_assigned_role = user_participant_id == times_assigned_participant.id + # b) the user is the assigned agency + user_is_assigned_agency = user_participant_id == assigned_agency.id + + # c) the user is BSMD, if the assigned agency allows that + assigned_agency_has_bsmd_flag = assigned_agency.flags == 1 + user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id}) + user_is_bsmd_and_assigned_agency_has_flag = assigned_agency_has_bsmd_flag & user_is_bsmd_type + + if user_is_assigned_role: + return + + elif user_is_assigned_agency: + return + + elif user_is_bsmd_and_assigned_agency_has_flag: + return + + else: + raise ValidationError({"user_participant_type": f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. Alternatively, the assigned agency may edit and delete the dataset. As a special case, BSMD users may edit and delete times datasets, when the assigned agency allows that. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"}) + @staticmethod - def get_participant_id_from_shipcall_participant_map(shipcall_id:int, participant_type:int, spm_shipcall_data=None)->int: + def get_participant_id_from_shipcall_participant_map(shipcall_id:typing.Optional[int], participant_type:int, spm_shipcall_data=None)->int: """use shipcall_id and participant_type to identify the matching participant_id""" if shipcall_id is None: raise ValidationError({"shipcall_id":f"Could not find a referenced shipcall_id within the request."}) @@ -479,6 +503,42 @@ class InputValidationTimes(): has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))] return has_bsmd_flag + + @staticmethod + def prepare_authority_check_for_put_request(loadedModel)->typing.Tuple[int,Participant]: + """extracts the loadedModel to obtain relevant arguments""" + shipcall_id = loadedModel["shipcall_id"] + participant_type = loadedModel["participant_type"] + + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type) + + if times_assigned_participant is None: + raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."}) + return (shipcall_id, times_assigned_participant) + + @staticmethod + def prepare_authority_check_for_delete_request(times_id, pdata=None)->typing.Tuple[int,Participant]: + if pdata is None: # regular behavior. pdata is only defined in unit tests. + # perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection. + query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?" + pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None) + + # extracts the participant_id from the first matching entry, if applicable + if not len(pdata)>0: + # this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already + raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"}) + else: + participant_type = pdata[0].get("participant_type") + shipcall_id = pdata[0].get("shipcall_id") + + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type) + + if times_assigned_participant is None: + raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."}) + return (shipcall_id, times_assigned_participant) + def deprecated_build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]: