regardless of the BSMD flag, BSMD users are now able to perform shipc… (#51)
* regardless of the BSMD flag, BSMD users are now able to perform shipcall PUT-requests * regardless of the BSMD flag, BSMD users are now able to perform shipcall PUT-requests * docstrings and BSMD-flag handling
This commit is contained in:
parent
8fe2a9ebca
commit
c0902c65ee
@ -6,6 +6,9 @@ import typing
|
||||
from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times, ShipcallParticipantMap
|
||||
from BreCal.database.enums import ParticipantType
|
||||
from BreCal.local_db import getPoolConnection
|
||||
from BreCal.database.sql_queries import SQLQuery
|
||||
from BreCal.schemas import model
|
||||
|
||||
|
||||
def pandas_series_to_data_model():
|
||||
return
|
||||
@ -89,7 +92,14 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N
|
||||
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
|
||||
if schemas is sentinel:
|
||||
raise Exception("no such record")
|
||||
elif command_type=="single_or_none":
|
||||
sentinel = object()
|
||||
|
||||
# pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary.
|
||||
# when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model.
|
||||
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
|
||||
schemas = None if schemas is sentinel else schemas
|
||||
|
||||
elif command_type=="execute_scalar":
|
||||
schemas = commands.execute_scalar(query)
|
||||
|
||||
@ -100,6 +110,26 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N
|
||||
if rebuild_pooled_connection:
|
||||
pooledConnection.close()
|
||||
return schemas
|
||||
|
||||
def get_assigned_participant_of_type(shipcall_id:int, participant_type:typing.Union[int,model.ParticipantType])->typing.Optional[model.Participant]:
|
||||
"""obtains the ShipcallParticipantMap of a given shipcall and finds the participant id of a desired type. Finally, returns the respective Participant"""
|
||||
spm_shipcall_data = execute_sql_query_standalone(
|
||||
query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(),
|
||||
param={"id":shipcall_id, "type":participant_type},
|
||||
command_type="query") # returns a list of matches
|
||||
|
||||
if len(spm_shipcall_data)==0:
|
||||
return None
|
||||
|
||||
query = 'SELECT * FROM participant WHERE id=?participant_id?'
|
||||
assigned_participant = execute_sql_query_standalone(
|
||||
query=query,
|
||||
param={"participant_id":spm_shipcall_data[0]["participant_id"]},
|
||||
model=model.Participant,
|
||||
command_type="single_or_none"
|
||||
) # returns a list of matches
|
||||
return assigned_participant
|
||||
|
||||
|
||||
class SQLHandler():
|
||||
"""
|
||||
|
||||
@ -12,6 +12,7 @@ from BreCal.impl.berths import GetBerths
|
||||
|
||||
from BreCal.database.enums import ParticipantType, ParticipantFlag
|
||||
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data
|
||||
from BreCal.database.sql_handler import get_assigned_participant_of_type
|
||||
from BreCal.database.sql_handler import execute_sql_query_standalone
|
||||
from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag
|
||||
from BreCal.validators.validation_base_utils import check_if_string_has_special_characters
|
||||
@ -482,12 +483,12 @@ class InputValidationShipcall():
|
||||
a) belong to the ASSIGNED agency participant group
|
||||
b) belong to a BSMD participant, if the assigned agency has enabled the bit flag
|
||||
|
||||
When there is not yet an assigned agency for the respective shipcall, the request fails, and the user is considered as not authorized.
|
||||
When there is not yet an assigned agency for the respective shipcall, only BSMD users are authorized.
|
||||
This mechanism prevents self-assignment of an agency to arbitrary shipcalls.
|
||||
"""
|
||||
### preparation ###
|
||||
# use the decoded JWT token and extract the participant type & participant id
|
||||
participant_id = user_data.get("participant_id")
|
||||
user_participant_id = user_data.get("participant_id")
|
||||
participant_type = get_participant_type_from_user_data(user_data)
|
||||
user_is_bsmd = (ParticipantType.BSMD in participant_type)
|
||||
|
||||
@ -497,42 +498,41 @@ class InputValidationShipcall():
|
||||
### AGENCY in SPM ###
|
||||
# determine, who is assigned as the agency for the shipcall
|
||||
if shipcall_participant_map is None:
|
||||
query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)'
|
||||
assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)})
|
||||
# query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)'
|
||||
# assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)})
|
||||
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
|
||||
an_agency_is_assigned = True if assigned_agency is not None else False
|
||||
|
||||
else:
|
||||
assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)]
|
||||
|
||||
an_agency_is_assigned = len(assigned_agency)==1
|
||||
if len(assigned_agency)>1:
|
||||
raise ValidationError({"internal_error":f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}"})
|
||||
|
||||
if an_agency_is_assigned:
|
||||
# Agency assigned? User must belong to the assigned agency or be a BSMD user, in case the flag is set
|
||||
assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)]
|
||||
an_agency_is_assigned = len(assigned_agency)==1
|
||||
|
||||
if len(assigned_agency)>1:
|
||||
raise ValidationError({"internal_error":f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}"})
|
||||
assigned_agency = assigned_agency[0]
|
||||
|
||||
# determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls
|
||||
query = 'SELECT * FROM participant where (id = ?participant_id?)'
|
||||
agency_participant = execute_sql_query_standalone(query=query, param={"participant_id" : assigned_agency.participant_id}, command_type="single", model=Participant)
|
||||
if an_agency_is_assigned:
|
||||
assert isinstance(assigned_agency, Participant), f"expecting the assigency agency to be a Participant object. Found: {type(assigned_agency)}"
|
||||
assert isinstance(assigned_agency.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(assigned_agency.flags)}"
|
||||
|
||||
assert isinstance(agency_participant.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(agency_participant.flags)}"
|
||||
agency_has_bsmd_flag = agency_participant.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags)
|
||||
# determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls
|
||||
agency_has_bsmd_flag = assigned_agency.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags)
|
||||
|
||||
### USER authority ###
|
||||
# determine, whether the user is a) the assigned agency or b) a BSMD participant
|
||||
user_is_assigned_agency = (participant_id == assigned_agency.participant_id)
|
||||
user_is_assigned_agency = (user_participant_id == assigned_agency.participant_id)
|
||||
|
||||
# when the BSMD flag is set: the user must be either BSMD or the assigned agency
|
||||
# when the BSMD flag is not set: the user must be the assigned agency
|
||||
user_is_authorized = (user_is_bsmd or user_is_assigned_agency) if agency_has_bsmd_flag else user_is_assigned_agency
|
||||
user_is_authorized = (user_is_bsmd or user_is_assigned_agency) #if agency_has_bsmd_flag else user_is_assigned_agency
|
||||
|
||||
if not user_is_authorized:
|
||||
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {agency_participant.flags}") # Forbidden: 403
|
||||
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {assigned_agency.flags}") # Forbidden: 403
|
||||
|
||||
else:
|
||||
# when there is no assigned agency, only BSMD users can update the shipcall
|
||||
user_is_authorized = user_is_bsmd
|
||||
|
||||
if not user_is_authorized:
|
||||
if not user_is_bsmd:
|
||||
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.") # part of a pytest.raises. Forbidden: 403
|
||||
|
||||
return
|
||||
|
||||
@ -15,6 +15,7 @@ from BreCal.database.enums import ParticipantType, ParticipantFlag
|
||||
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone
|
||||
from BreCal.database.sql_queries import SQLQuery
|
||||
from BreCal.database.sql_handler import execute_sql_query_standalone
|
||||
from BreCal.database.sql_handler import get_assigned_participant_of_type
|
||||
from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters
|
||||
import werkzeug
|
||||
|
||||
@ -379,45 +380,68 @@ class InputValidationTimes():
|
||||
# identify the user's participant id
|
||||
user_participant_id = user_data["participant_id"]
|
||||
|
||||
""" # #TODO:
|
||||
First of all, this method is shared for PUT and DELETE requests.
|
||||
PUT) is based on the loadedModel
|
||||
DELETE) is based on the times_id
|
||||
Both of them share the {user_data}-argument
|
||||
|
||||
These arguments are used to obtain shipcall_id, participant_type (of the times entry) and times_assigned_participant
|
||||
|
||||
there should be the following authorization approaches
|
||||
a) the user has the participant ID of the assigned entry for a given role
|
||||
for this, we need:
|
||||
1) user_participant_id
|
||||
2) times_participant_type
|
||||
3) SPM: assigned participant of the respective type (times_assigned_participant)
|
||||
_ = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.WHATTYPE)
|
||||
|
||||
b) the user is the assigned agency (or the BSMD if allowed)
|
||||
for this, we need:
|
||||
1) assigned_agency
|
||||
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
|
||||
2) agency's flag
|
||||
assigned_agency.flags
|
||||
3) user_is_bsmd boolean
|
||||
"""
|
||||
|
||||
# commonly used in the PUT-request
|
||||
if loadedModel is not None:
|
||||
shipcall_id = loadedModel["shipcall_id"]
|
||||
participant_type = loadedModel["participant_type"]
|
||||
|
||||
# get the matching entry from the shipcall participant map. Raise an error, when there is no match.
|
||||
participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type)
|
||||
(shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_put_request(loadedModel)
|
||||
|
||||
# commonly used in the DELETE-request
|
||||
if times_id is not None:
|
||||
if pdata is None: # regular behavior. pdata is only defined in unit tests.
|
||||
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
|
||||
query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?"
|
||||
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
|
||||
# #TODO_refactor:
|
||||
(shipcall_id, times_assigned_participant) = InputValidationTimes.prepare_authority_check_for_delete_request(times_id, pdata)
|
||||
|
||||
# extracts the participant_id from the first matching entry, if applicable
|
||||
if not len(pdata)>0:
|
||||
# this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already
|
||||
raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"})
|
||||
else:
|
||||
participant_type = pdata[0].get("participant_type")
|
||||
shipcall_id = pdata[0].get("shipcall_id")
|
||||
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
|
||||
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
|
||||
|
||||
# get the matching entry from the shipcall participant map. Raise an error, when there is no match.
|
||||
participant_id_of_times_dataset = pdata[0].get("participant_id")
|
||||
# participant_id_of_times_dataset = InputValidationTimes.get_participant_id_from_shipcall_participant_map(shipcall_id, participant_type)
|
||||
|
||||
# when the user's participant id is different from the times dataset, an exception is raised
|
||||
if user_participant_id != participant_id_of_times_dataset:
|
||||
# for some AGENCY participants, users with the BSMD flag may also edit the datasets
|
||||
special_case__bsmd_may_edit_agency_dataset = InputValidationTimes.check_if_bsmd_may_edit_agency_dataset(user_participant_id, participant_id_of_times_dataset, participant_type)
|
||||
if special_case__bsmd_may_edit_agency_dataset:
|
||||
return
|
||||
else:
|
||||
raise ValidationError({"user_participant_type":f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"})
|
||||
return
|
||||
# a) the user has the participant ID of the assigned entry for a given role
|
||||
user_is_assigned_role = user_participant_id == times_assigned_participant.id
|
||||
|
||||
# b) the user is the assigned agency
|
||||
user_is_assigned_agency = user_participant_id == assigned_agency.id
|
||||
|
||||
# c) the user is BSMD, if the assigned agency allows that
|
||||
assigned_agency_has_bsmd_flag = assigned_agency.flags == 1
|
||||
user_is_bsmd_type = check_if_user_is_bsmd_type(user_data={"participant_id":user_participant_id})
|
||||
user_is_bsmd_and_assigned_agency_has_flag = assigned_agency_has_bsmd_flag & user_is_bsmd_type
|
||||
|
||||
if user_is_assigned_role:
|
||||
return
|
||||
|
||||
elif user_is_assigned_agency:
|
||||
return
|
||||
|
||||
elif user_is_bsmd_and_assigned_agency_has_flag:
|
||||
return
|
||||
|
||||
else:
|
||||
raise ValidationError({"user_participant_type": f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. Alternatively, the assigned agency may edit and delete the dataset. As a special case, BSMD users may edit and delete times datasets, when the assigned agency allows that. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}"})
|
||||
|
||||
@staticmethod
|
||||
def get_participant_id_from_shipcall_participant_map(shipcall_id:int, participant_type:int, spm_shipcall_data=None)->int:
|
||||
def get_participant_id_from_shipcall_participant_map(shipcall_id:typing.Optional[int], participant_type:int, spm_shipcall_data=None)->int:
|
||||
"""use shipcall_id and participant_type to identify the matching participant_id"""
|
||||
if shipcall_id is None:
|
||||
raise ValidationError({"shipcall_id":f"Could not find a referenced shipcall_id within the request."})
|
||||
@ -479,6 +503,42 @@ class InputValidationTimes():
|
||||
|
||||
has_bsmd_flag = ParticipantFlag.BSMD in [ParticipantFlag(participant.get("flags"))]
|
||||
return has_bsmd_flag
|
||||
|
||||
@staticmethod
|
||||
def prepare_authority_check_for_put_request(loadedModel)->typing.Tuple[int,Participant]:
|
||||
"""extracts the loadedModel to obtain relevant arguments"""
|
||||
shipcall_id = loadedModel["shipcall_id"]
|
||||
participant_type = loadedModel["participant_type"]
|
||||
|
||||
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
|
||||
times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type)
|
||||
|
||||
if times_assigned_participant is None:
|
||||
raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."})
|
||||
return (shipcall_id, times_assigned_participant)
|
||||
|
||||
@staticmethod
|
||||
def prepare_authority_check_for_delete_request(times_id, pdata=None)->typing.Tuple[int,Participant]:
|
||||
if pdata is None: # regular behavior. pdata is only defined in unit tests.
|
||||
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
|
||||
query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?"
|
||||
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
|
||||
|
||||
# extracts the participant_id from the first matching entry, if applicable
|
||||
if not len(pdata)>0:
|
||||
# this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already
|
||||
raise ValidationError({"times_id":f"Unknown times_id. Could not find a matching entry for ID: {times_id}"})
|
||||
else:
|
||||
participant_type = pdata[0].get("participant_type")
|
||||
shipcall_id = pdata[0].get("shipcall_id")
|
||||
|
||||
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
|
||||
times_assigned_participant = get_assigned_participant_of_type(shipcall_id, participant_type=participant_type)
|
||||
|
||||
if times_assigned_participant is None:
|
||||
raise ValidationError({"participant_type":"the requested participant type is not assigned to the shipcall."})
|
||||
return (shipcall_id, times_assigned_participant)
|
||||
|
||||
|
||||
|
||||
def deprecated_build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]:
|
||||
|
||||
Reference in New Issue
Block a user