From 5e2cb3f74510ac0136f51f52cdbc1b53948d5f84 Mon Sep 17 00:00:00 2001 From: Daniel Schick Date: Thu, 17 Oct 2024 14:49:51 +0200 Subject: [PATCH] Fixed some more small bugs in validation when only a partial times dataset is put --- src/server/BreCal/__init__.py | 3 +- src/server/BreCal/database/sql_utils.py | 8 +++++ .../validators/input_validation_times.py | 34 +++++++++++-------- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/src/server/BreCal/__init__.py b/src/server/BreCal/__init__.py index ad6b3d0..7fcd280 100644 --- a/src/server/BreCal/__init__.py +++ b/src/server/BreCal/__init__.py @@ -51,7 +51,8 @@ def create_app(test_config=None, instance_path=None): try: import os print(f'Instance path = {app.instance_path}') - os.makedirs(app.instance_path) + if not os.path.exists(app.instance_path): + os.makedirs(app.instance_path) except OSError: pass diff --git a/src/server/BreCal/database/sql_utils.py b/src/server/BreCal/database/sql_utils.py index 5780ace..d4aefef 100644 --- a/src/server/BreCal/database/sql_utils.py +++ b/src/server/BreCal/database/sql_utils.py @@ -12,3 +12,11 @@ def get_user_data_for_id(user_id:int, expiration_time:int=90): user_data["exp"] = (datetime.datetime.now()+datetime.timedelta(minutes=expiration_time)).timestamp() return user_data + +def get_times_data_for_id(times_id:int): + """helper function to load previous times data from the database""" + query = "SELECT * FROM times where id = ?id?" + pdata = execute_sql_query_standalone(query=query, param={"id":times_id}) + pdata = pdata[0] if len(pdata)>0 else None + assert pdata is not None, f"could not find times with id {times_id}" + return pdata \ No newline at end of file diff --git a/src/server/BreCal/validators/input_validation_times.py b/src/server/BreCal/validators/input_validation_times.py index 2fd520b..25c58df 100644 --- a/src/server/BreCal/validators/input_validation_times.py +++ b/src/server/BreCal/validators/input_validation_times.py @@ -16,6 +16,7 @@ from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, from BreCal.database.sql_queries import SQLQuery from BreCal.database.sql_handler import execute_sql_query_standalone from BreCal.database.sql_handler import get_assigned_participant_of_type +from BreCal.database.sql_utils import get_times_data_for_id from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters import werkzeug @@ -98,6 +99,10 @@ class InputValidationTimes(): # 1.) Check for the presence of required fields InputValidationTimes.check_times_required_fields_put_data(content) + # 1.5) Load model from database and set existing fields from dict to model + old_times = get_times_data_for_id(content["id"]) + loadedModel = {**old_times, **loadedModel} + # 2.) Only users of the same participant_id, which the times dataset refers to, can update the entry InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=loadedModel, times_id=None) @@ -411,10 +416,10 @@ class InputValidationTimes(): # commonly used in the DELETE-request if times_id is not None: - # #TODO_refactor: + # #TODO_refactor: (shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_delete_request(times_id, pdata) - # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) # a) the user has the participant ID of the assigned entry for a given role @@ -422,24 +427,25 @@ class InputValidationTimes(): # b) the user is the assigned agency user_is_assigned_agency = user_participant_id == assigned_agency.id - + # c) the user is BSMD, if the assigned agency allows that assigned_agency_has_bsmd_flag = assigned_agency.flags == 1 user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id}) user_is_bsmd_and_assigned_agency_has_flag = assigned_agency_has_bsmd_flag & user_is_bsmd_type - + if user_is_assigned_role: return - + elif user_is_assigned_agency: return - + elif user_is_bsmd_and_assigned_agency_has_flag: return - + else: - raise ValidationError({"user_participant_type": f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. Alternatively, the assigned agency may edit and delete the dataset. As a special case, BSMD users may edit and delete times datasets, when the assigned agency allows that. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"}) - + times_participant_id = loadedModel.get("participant_id") + raise ValidationError({"user_participant_type": f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. Alternatively, the assigned agency may edit and delete the dataset. As a special case, BSMD users may edit and delete times datasets, when the assigned agency allows that. User participant_id: {user_participant_id}; Dataset participant_id: {times_participant_id}"}) + @staticmethod def get_participant_id_from_shipcall_participant_map(shipcall_id:typing.Optional[int], participant_type:int, spm_shipcall_data=None)->int: """use shipcall_id and participant_type to identify the matching participant_id""" @@ -503,20 +509,20 @@ class InputValidationTimes(): has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))] return has_bsmd_flag - + @staticmethod def prepare_authority_check_for_put_request(loadedModel)->typing.Tuple[int,Participant]: """extracts the loadedModel to obtain relevant arguments""" shipcall_id = loadedModel["shipcall_id"] participant_type = loadedModel["participant_type"] - # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type) if times_assigned_participant is None: raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."}) return (shipcall_id, times_assigned_participant) - + @staticmethod def prepare_authority_check_for_delete_request(times_id, pdata=None)->typing.Tuple[int,Participant]: if pdata is None: # regular behavior. pdata is only defined in unit tests. @@ -531,8 +537,8 @@ class InputValidationTimes(): else: participant_type = pdata[0].get("participant_type") shipcall_id = pdata[0].get("shipcall_id") - - # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. + + # get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match. times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type) if times_assigned_participant is None: