regardless of the BSMD flag, BSMD users are now able to perform shipc… (#51)

* regardless of the BSMD flag, BSMD users are now able to perform shipcall PUT-requests

* regardless of the BSMD flag, BSMD users are now able to perform shipcall PUT-requests

* docstrings and BSMD-flag handling
This commit is contained in:
scopesorting 2024-10-15 15:19:08 +02:00 committed by Daniel Schick
parent 4a0943c64f
commit fb8b732b1d
3 changed files with 142 additions and 52 deletions

View File

@ -6,6 +6,9 @@ import typing
from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times, ShipcallParticipantMap
from BreCal.database.enums import ParticipantType
from BreCal.local_db import getPoolConnection
from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model
def pandas_series_to_data_model():
return
@ -89,6 +92,13 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
if schemas is sentinel:
raise Exception("no such record")
elif command_type=="single_or_none":
sentinel = object()
# pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary.
# when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model.
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
schemas = None if schemas is sentinel else schemas
elif command_type=="execute_scalar":
schemas = commands.execute_scalar(query)
@ -101,6 +111,26 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N
pooledConnection.close()
return schemas
def get_assigned_participant_of_type(shipcall_id:int, participant_type:typing.Union[int,model.ParticipantType])->typing.Optional[model.Participant]:
"""obtains the ShipcallParticipantMap of a given shipcall and finds the participant id of a desired type. Finally, returns the respective Participant"""
spm_shipcall_data = execute_sql_query_standalone(
query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(),
param={"id":shipcall_id, "type":participant_type},
command_type="query") # returns a list of matches
if len(spm_shipcall_data)==0:
return None
query = 'SELECT * FROM participant WHERE id=?participant_id?'
assigned_participant = execute_sql_query_standalone(
query=query,
param={"participant_id":spm_shipcall_data[0]["participant_id"]},
model=model.Participant,
command_type="single_or_none"
) # returns a list of matches
return assigned_participant
class SQLHandler():
"""
An object that reads SQL queries from the sql_connection and stores it in pandas DataFrames. The object can read all available tables

View File

@ -12,6 +12,7 @@ from BreCal.impl.berths import GetBerths
from BreCal.database.enums import ParticipantType, ParticipantFlag
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data
from BreCal.database.sql_handler import get_assigned_participant_of_type
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag
from BreCal.validators.validation_base_utils import check_if_string_has_special_characters
@ -482,12 +483,12 @@ class InputValidationShipcall():
a) belong to the ASSIGNED agency participant group
b) belong to a BSMD participant, if the assigned agency has enabled the bit flag
When there is not yet an assigned agency for the respective shipcall, the request fails, and the user is considered as not authorized.
When there is not yet an assigned agency for the respective shipcall, only BSMD users are authorized.
This mechanism prevents self-assignment of an agency to arbitrary shipcalls.
"""
### preparation ###
# use the decoded JWT token and extract the participant type & participant id
participant_id = user_data.get("participant_id")
user_participant_id = user_data.get("participant_id")
participant_type = get_participant_type_from_user_data(user_data)
user_is_bsmd = (ParticipantType.BSMD in participant_type)
@ -497,42 +498,41 @@ class InputValidationShipcall():
### AGENCY in SPM ###
# determine, who is assigned as the agency for the shipcall
if shipcall_participant_map is None:
query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)'
assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)})
else:
assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)]
# query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)'
# assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)})
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
an_agency_is_assigned = True if assigned_agency is not None else False
else:
# Agency assigned? User must belong to the assigned agency or be a BSMD user, in case the flag is set
assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)]
an_agency_is_assigned = len(assigned_agency)==1
if len(assigned_agency)>1:
raise ValidationError({"internal_error":f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}"})
if an_agency_is_assigned:
# Agency assigned? User must belong to the assigned agency or be a BSMD user, in case the flag is set
assigned_agency = assigned_agency[0]
# determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls
query = 'SELECT * FROM participant where (id = ?participant_id?)'
agency_participant = execute_sql_query_standalone(query=query, param={"participant_id" : assigned_agency.participant_id}, command_type="single", model=Participant)
if an_agency_is_assigned:
assert isinstance(assigned_agency, Participant), f"expecting the assigency agency to be a Participant object. Found: {type(assigned_agency)}"
assert isinstance(assigned_agency.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(assigned_agency.flags)}"
assert isinstance(agency_participant.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(agency_participant.flags)}"
agency_has_bsmd_flag = agency_participant.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags)
# determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls
agency_has_bsmd_flag = assigned_agency.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags)
### USER authority ###
# determine, whether the user is a) the assigned agency or b) a BSMD participant
user_is_assigned_agency = (participant_id == assigned_agency.participant_id)
user_is_assigned_agency = (user_participant_id == assigned_agency.participant_id)
# when the BSMD flag is set: the user must be either BSMD or the assigned agency
# when the BSMD flag is not set: the user must be the assigned agency
user_is_authorized = (user_is_bsmd or user_is_assigned_agency) if agency_has_bsmd_flag else user_is_assigned_agency
user_is_authorized = (user_is_bsmd or user_is_assigned_agency) #if agency_has_bsmd_flag else user_is_assigned_agency
if not user_is_authorized:
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {agency_participant.flags}") # Forbidden: 403
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {assigned_agency.flags}") # Forbidden: 403
else:
# when there is no assigned agency, only BSMD users can update the shipcall
user_is_authorized = user_is_bsmd
if not user_is_authorized:
if not user_is_bsmd:
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.") # part of a pytest.raises. Forbidden: 403
return

View File

@ -15,6 +15,7 @@ from BreCal.database.enums import ParticipantType, ParticipantFlag
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone
from BreCal.database.sql_queries import SQLQuery
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.database.sql_handler import get_assigned_participant_of_type
from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters
import werkzeug
@ -379,45 +380,68 @@ class InputValidationTimes():
# identify the user's participant id
user_participant_id = user_data["participant_id"]
""" # #TODO:
First of all, this method is shared for PUT and DELETE requests.
PUT) is based on the loadedModel
DELETE) is based on the times_id
Both of them share the {user_data}-argument
These arguments are used to obtain shipcall_id, participant_type (of the times entry) and times_assigned_participant
there should be the following authorization approaches
a) the user has the participant ID of the assigned entry for a given role
for this, we need:
1) user_participant_id
2) times_participant_type
3) SPM: assigned participant of the respective type (times_assigned_participant)
_ = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.WHATTYPE)
b) the user is the assigned agency (or the BSMD if allowed)
for this, we need:
1) assigned_agency
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
2) agency's flag
assigned_agency.flags
3) user_is_bsmd boolean
"""
# commonly used in the PUT-request
if loadedModel is not None:
shipcall_id = loadedModel["shipcall_id"]
participant_type = loadedModel["participant_type"]
# get the matching entry from the shipcall participant map. Raise an error, when there is no match.
participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type)
(shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_put_request(loadedModel)
# commonly used in the DELETE-request
if times_id is not None:
if pdata is None: # regular behavior. pdata is only defined in unit tests.
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?"
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
# #TODO_refactor:
(shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_delete_request(times_id, pdata)
# extracts the participant_id from the first matching entry, if applicable
if not len(pdata)>0:
# this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already
raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"})
else:
participant_type = pdata[0].get("participant_type")
shipcall_id = pdata[0].get("shipcall_id")
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
# get the matching entry from the shipcall participant map. Raise an error, when there is no match.
participant_id_of_times_dataset = pdata[0].get("participant_id")
# participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type)
# a) the user has the participant ID of the assigned entry for a given role
user_is_assigned_role = user_participant_id == times_assigned_participant.id
# when the user's participant id is different from the times dataset, an exception is raised
if user_participant_id != participant_id_of_times_dataset:
# for some AGENCY participants, users with the BSMD flag may also edit the datasets
special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, participant_id_of_times_dataset, participant_type)
if special_case__bsmd_may_edit_agency_dataset:
# b) the user is the assigned agency
user_is_assigned_agency = user_participant_id == assigned_agency.id
# c) the user is BSMD, if the assigned agency allows that
assigned_agency_has_bsmd_flag = assigned_agency.flags == 1
user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id})
user_is_bsmd_and_assigned_agency_has_flag = assigned_agency_has_bsmd_flag & user_is_bsmd_type
if user_is_assigned_role:
return
else:
raise ValidationError({"user_participant_type":f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"})
elif user_is_assigned_agency:
return
elif user_is_bsmd_and_assigned_agency_has_flag:
return
else:
raise ValidationError({"user_participant_type": f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. Alternatively, the assigned agency may edit and delete the dataset. As a special case, BSMD users may edit and delete times datasets, when the assigned agency allows that. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"})
@staticmethod
def get_participant_id_from_shipcall_participant_map(shipcall_id:int, participant_type:int, spm_shipcall_data=None)->int:
def get_participant_id_from_shipcall_participant_map(shipcall_id:typing.Optional[int], participant_type:int, spm_shipcall_data=None)->int:
"""use shipcall_id and participant_type to identify the matching participant_id"""
if shipcall_id is None:
raise ValidationError({"shipcall_id":f"Could not find a referenced shipcall_id within the request."})
@ -480,6 +504,42 @@ class InputValidationTimes():
has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))]
return has_bsmd_flag
@staticmethod
def prepare_authority_check_for_put_request(loadedModel)->typing.Tuple[int,Participant]:
"""extracts the loadedModel to obtain relevant arguments"""
shipcall_id = loadedModel["shipcall_id"]
participant_type = loadedModel["participant_type"]
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type)
if times_assigned_participant is None:
raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."})
return (shipcall_id, times_assigned_participant)
@staticmethod
def prepare_authority_check_for_delete_request(times_id, pdata=None)->typing.Tuple[int,Participant]:
if pdata is None: # regular behavior. pdata is only defined in unit tests.
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?"
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
# extracts the participant_id from the first matching entry, if applicable
if not len(pdata)>0:
# this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already
raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"})
else:
participant_type = pdata[0].get("participant_type")
shipcall_id = pdata[0].get("shipcall_id")
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type)
if times_assigned_participant is None:
raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."})
return (shipcall_id, times_assigned_participant)
def deprecated_build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]:
"""