import typing import json import datetime from abc import ABC, abstractmethod from marshmallow import ValidationError from string import ascii_letters, digits from BreCal.schemas.model import Ship, Shipcall, Berth, User, Participant, ShipcallType, ShipcallParticipantMap from BreCal.impl.participant import GetParticipant from BreCal.impl.ships import GetShips from BreCal.impl.berths import GetBerths from BreCal.database.enums import ParticipantType, ParticipantFlag from BreCal.database.sql_utils import get_shipcall_data_for_id from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data from BreCal.database.sql_handler import get_assigned_participant_of_type from BreCal.database.sql_handler import execute_sql_query_standalone from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag from BreCal.validators.validation_base_utils import check_if_string_has_special_characters from BreCal.database.sql_queries import SQLQuery import werkzeug class InputValidationShipcall(): """ This class combines a complex set of individual input validation functions into a joint object. It uses static methods, so the object does not need to be instantiated, but functions can be called immediately. Example: InputValidationShipcall.evaluate(user_data, loadedModel, content) When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues. """ def __init__(self) -> None: pass @staticmethod def evaluate_post_data(user_data:dict, loadedModel:dict, content:dict): """ this function combines multiple validation functions to verify data, which is sent to the API as a shipcall's POST-request checks: 1. permission: only participants that belong to the BSMD or AGENCY groups are allowed to POST shipcalls 2. reference checks: all refered objects within the Shipcall must exist 3. existance of required fields 4. reasonable values: validates the values within the Shipcall """ # check for permission (only BSMD-type or AGENT-type participants) InputValidationShipcall.check_user_is_bsmd_or_agent_type(user_data) # check references (referred IDs must exist) InputValidationShipcall.check_referenced_ids(loadedModel) # POST-request only: check the existance of required fields based on the ShipcallType InputValidationShipcall.check_required_fields_exist_based_on_type(loadedModel, content) # POST-request only: check the existance of a participant list, when the user is of type agency InputValidationShipcall.check_participant_list_not_empty_when_user_is_agency(loadedModel) # check for reasonable values in the shipcall fields InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["evaluation", "evaluation_message"]) # "canceled" return @staticmethod def evaluate_put_data(user_data:dict, loadedModel:dict, content:dict): """ this function combines multiple validation functions to verify data, which is sent to the API as a shipcall's PUT-request checks: 1. user's authority: a) whether the user's participant is assigned to the shipcall (via shipcall-participant-map) b) whether the user is either an AGENCY (assigned) or the BSMD, in case the AGENCY allows the BSMD to edit their shipcalls 2. existance of required fields 3. all value-rules of the POST evaluation 4. a canceled shipcall may not be changed """ # check, whether the shipcall_id exists InputValidationShipcall.check_shipcall_id_exists(loadedModel) # check, whether an agency is listed in the shipcall-participant-map # deprecated: InputValidationShipcall.check_agency_in_shipcall_participant_map(user_data, loadedModel, content) # check, whether the user belongs to the assigned agency or to BSMD in case the special flag is enabled InputValidationShipcall.check_user_is_authorized_for_put_request(user_data, loadedModel, content) # the ID field is required, all missing fields will be ignored in the update InputValidationShipcall.check_required_fields_of_put_request(content) # check the referenced IDs InputValidationShipcall.check_referenced_ids(loadedModel) # check for reasonable values in the shipcall fields and checks for forbidden keys. InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["evaluation", "evaluation_message"], is_put_data=True) # a canceled shipcall cannot be selected # Note: 'canceled' is allowed in PUT-requests, if it is not already set (which is checked by InputValidationShipcall.check_shipcall_is_cancel) InputValidationShipcall.check_shipcall_is_canceled(loadedModel, content) return @staticmethod def exists_shipcall_by_id(id:int): return get_shipcall_data_for_id(id) is not None @staticmethod def check_shipcall_values(loadedModel:dict, content:dict, forbidden_keys:list=["evaluation", "evaluation_message"], is_put_data:bool=False): """ individually checks each value provided in the loadedModel/content. This function validates, whether the values are reasonable. Also, some data may not be set in a POST-request. options: is_put_data: bool. Some validation rules do not apply to POST data, but apply to PUT data. This flag separates the two. """ # Note: BreCal.schemas.model.ShipcallSchema has an internal validation, which the marshmallow library provides. This is used # to verify values individually, when the schema is loaded with data. # This function focuses on more complex input validation, which may require more sophisticated methods # loadedModel fills missing values, sometimes using optional values. Hence, the 'content'-variable is prefered for some of these verifications # voyage shall not contain special characters voyage_str_is_invalid = check_if_string_has_special_characters(text=content.get("voyage","")) if voyage_str_is_invalid: raise ValidationError({"voyage":f"there are invalid characters in the 'voyage'-string. Please use only digits and ASCII letters. Allowed: {ascii_letters+digits}. Found: {content.get('voyage')}"}) # the 'flags' integer must be valid flags_value = content.get("flags", 0) if check_if_int_is_valid_flag(flags_value, enum_object=ParticipantFlag): raise ValidationError({"flags":f"incorrect value provided for 'flags'. Must be a valid combination of the flags."}) existing_shipcall = None if is_put_data: existing_shipcall = InputValidationShipcall.get_shipcall_by_id(loadedModel.get("id")) # the type of a shipcall may not be changed. It can only be set with the initial POST-request. InputValidationShipcall.check_shipcall_type_is_unchanged(loadedModel, existing_shipcall) InputValidationShipcall.check_times_are_in_future(loadedModel, content, existing_shipcall) # some arguments must not be provided InputValidationShipcall.check_forbidden_arguments(content, forbidden_keys=forbidden_keys) return @staticmethod def check_agency_in_shipcall_participant_map(user_data:dict, loadedModel:dict, content:dict, spm_shipcall_data:typing.Optional[list]=None): """ When the request is issued by a user of type 'AGENCY', there must be special caution. Agency users cannot self-assign as participants of a shipcall. Further, when no AGENCY is assigned to the shipcall, a PUT-request is not feasible. In those cases, the BSMD must first assign an agency, before a PUT-request can assign further participants. Upon violation, this method issues 'Forbidden'-Exceptions with HTTP status code 403. There are four reasons for violations: a) an agency tries to self-assign for a shipcall b) there is no assigned agency for the current shipcall c) an agency is assigned, but the current agency-user belongs to a different participant_id d) the user must be of ParticipantType BSMD or AGENCY args: spm_shipcall_data: a list of entries obtained from the ShipcallParticipantMap. These are deserialized dictionaries. e.g., [{'participant_id': 136, 'type': 8}, ] """ raise Exception("deprecated") if spm_shipcall_data is None: # read the ShipcallParticipantMap entry of the current shipcall_id. This is used within the input validation of a PUT request spm_shipcall_data = execute_sql_query_standalone( # #TODO_refactor: place this within the SQLQuery object query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?", param={"shipcall_id":loadedModel["id"]}, pooledConnection=None ) # which role should be set by the PUT request? If the agency is about to be set, an error will be created # read the user data from the JWT token (set when login is performed) user_type = get_participant_type_from_user_data(user_data) # decode JWT -> get 'type' value (guarantees to convert user type into an IntFlag) assert isinstance(user_type, ParticipantType) # select the matching entries from the ShipcallParticipantMap agency_entries = [spm_entry for spm_entry in spm_shipcall_data if int(spm_entry.get("type"))==int(ParticipantType.AGENCY)] # find all entries of type AGENCY (there should be at max. 1) # when the request stems from an AGENCY user, and the user wants to PUT an AGENCY role, the request should fail # boolean: check, whether any of the assigned participants is of type AGENCY types = [participant.get("type",0) for participant in loadedModel["participants"]] # readout the participants from the loadedModel, which shall be assigned by the PUT request any_type_is_agency = any([int(type_) == int(ParticipantType.AGENCY) for type_ in types]) # check, whether *any* of the participants is an agency if not ((ParticipantType.AGENCY in user_type) or (ParticipantType.BSMD in user_type)): # user not AGENCY or BSMD raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by AGENCY or BSMD users.") # Forbidden: 403 # Placeholder: when a user is an AGENCY, if (ParticipantType.AGENCY in user_type) & (any_type_is_agency): # self-assignment: agency sets agency participant raise werkzeug.exceptions.Forbidden(f"An agency cannot self-register for a shipcall. The request is issued by an agency-user and tries to assign an AGENCY as the participant of the shipcall.") # Forbidden: 403 if len(agency_entries)>0: # agency participant exists: participant id must be the same as shipcall participant map entry matching_spm_entry = [spm_entry for spm_entry in spm_shipcall_data if (spm_entry.get("participant_id")==user_data["id"]) & (int(spm_entry.get("type"))==int(ParticipantType.AGENCY))] if len(matching_spm_entry)==0: # An AGENCY was found, but a different participant_id is assigned to that AGENCY raise werkzeug.exceptions.Forbidden(f"A different participant_id is assigned as the AGENCY of this shipcall. Provided ID: {user_data.get('id')}, Assigned ShipcallParticipantMap: {agency_entries}") # Forbidden: 403 else: # a matching agency was found: no violation return else: # agency participant does not exist: there is no assigned agency role for the shipcall {shipcall_id} raise werkzeug.exceptions.Forbidden(f"There is no assigned agency for this shipcall. Shipcall ID: {loadedModel['id']}") # Forbidden: 403 return @staticmethod def check_user_is_bsmd_or_agent_type(user_data): """ check, whether the user belongs to a participant, which is of type ParticipantType.BSMD as ParticipantType is an IntFlag, a user belonging to multiple groups is properly evaluated. """ # use the decoded JWT token and extract the participant type participant_type = get_participant_type_from_user_data(user_data) is_bsmd = (ParticipantType.BSMD in participant_type) is_agency = (ParticipantType.AGENCY in participant_type) is_bsmd_or_agency = (is_bsmd) or (is_agency) if not is_bsmd_or_agency: raise ValidationError({"participant_type":f"current user must be either of participant type BSMD or AGENCY. Cannot post or put shipcalls. Found user data: {user_data} and participant_type: {participant_type}"}) return @staticmethod def check_referenced_ids(loadedModel): """ check, whether the referenced entries exist (e.g., when a Ship ID is referenced, but does not exist, the validation fails) """ # #TODO: arrival and departure berth id should be coupled with the shipcall type. One shall not provide # arrival berth id when the shipcall type is departure or vise versa. # a similar logic has already been implemented to the eta/etd or for the operation windows # get all IDs from the loadedModel ship_id = loadedModel.get("ship_id", None) arrival_berth_id = loadedModel.get("arrival_berth_id", None) departure_berth_id = loadedModel.get("departure_berth_id", None) port_id = loadedModel.get("port_id", None) participants = loadedModel.get("participants",[]) valid_ship_id = check_if_ship_id_is_valid(ship_id=ship_id) if not valid_ship_id: raise ValidationError({"ship_id":f"provided an invalid ship id, which is not found in the database: {ship_id}"}) valid_arrival_berth_id = check_if_berth_id_is_valid(berth_id=arrival_berth_id, port_id=port_id) if not valid_arrival_berth_id: raise ValidationError({"arrival_berth_id":f"provided an invalid arrival berth id, which is not found in the database: {arrival_berth_id}, or the berth is not assigned to the port: {port_id}"}) valid_departure_berth_id = check_if_berth_id_is_valid(berth_id=departure_berth_id, port_id=port_id) if not valid_departure_berth_id: raise ValidationError({"departure_berth_id":f"provided an invalid departure berth id, which is not found in the database: {departure_berth_id}, or the berth is not assigned to the port: {port_id}"}) valid_participant_ids = check_if_participant_ids_are_valid(participants=participants, port_id=port_id) if not valid_participant_ids: raise ValidationError({"participants":f"one of the provided participant ids is invalid. Could not find one of these in the database: {participants}"}) valid_participant_types = check_if_participant_ids_and_types_are_valid(participants=participants) if not valid_participant_types: # #TODO: according to Daniel, there may eventually be multi-assignment of participants for the same role raise ValidationError({"participants":f"every participant id and type should be listed only once. Found multiple entries for one of the participants."}) @staticmethod def check_shipcall_type_is_unchanged(loadedModel:dict, existing_shipcall:object): if int(loadedModel["type"]) != int(existing_shipcall.type): raise ValidationError({"type":f"The shipcall type may only be set in the initial POST-request. Afterwards, changing the shipcall type is not allowed."}) # @pytest.raises return @staticmethod def get_shipcall_by_id(shipcall_id:int): query = SQLQuery.get_shipcall_by_id() shipcall = execute_sql_query_standalone(query=query, model=Shipcall, param={"id":shipcall_id}, command_type="single") return shipcall @staticmethod def check_forbidden_arguments(content:dict, forbidden_keys=["evaluation", "evaluation_message"]): """ a post-request must not contain the arguments 'canceled', 'evaluation', 'evaluation_message'. a put-request must not contain the arguments 'evaluation', 'evaluation_message' """ # the following keys should not be set in a POST-request. for forbidden_key in forbidden_keys: value = content.get(forbidden_key, None) if value is not None: raise ValidationError({"forbidden_key":f"'{forbidden_key}' may not be set on POST. Found: {value}"}) return @staticmethod def check_required_fields_exist_based_on_type(loadedModel:dict, content:dict): """ depending on the ShipcallType, some fields are *required* in a POST-request """ type_ = loadedModel.get("type", int(ShipcallType.undefined)) ship_id = content.get("ship_id", None) eta = content.get("eta", None) etd = content.get("etd", None) arrival_berth_id = content.get("arrival_berth_id", None) departure_berth_id = content.get("departure_berth_id", None) if ship_id is None: raise ValidationError({"ship_id":f"providing 'ship_id' is mandatory. Missing key!"}) if int(type_)==int(ShipcallType.undefined): raise ValidationError({"type":f"providing 'type' is mandatory. Missing key!"}) # arrival: arrival_berth_id & eta must exist elif int(type_)==int(ShipcallType.arrival): if eta is None: raise ValidationError({"eta":f"providing 'eta' is mandatory. Missing key!"}) if arrival_berth_id is None: raise ValidationError({"arrival_berth_id":f"providing 'arrival_berth_id' is mandatory. Missing key!"}) # departure: departive_berth_id and etd must exist elif int(type_)==int(ShipcallType.departure): if etd is None: raise ValidationError({"etd":f"providing 'etd' is mandatory. Missing key!"}) if departure_berth_id is None: raise ValidationError({"departure_berth_id":f"providing 'departure_berth_id' is mandatory. Missing key!"}) # shifting: arrival_berth_id, departure_berth_id, eta and etd must exist elif int(type_)==int(ShipcallType.shifting): if (eta is None) or (etd is None): raise ValidationError({"eta_or_etd":f"providing 'eta' and 'etd' is mandatory. Missing one of those keys!"}) if (arrival_berth_id is None) or (departure_berth_id is None): raise ValidationError({"arrival_berth_id_or_departure_berth_id":f"providing 'arrival_berth_id' & 'departure_berth_id' is mandatory. Missing key!"}) else: raise ValidationError({"type":f"incorrect 'type' provided!"}) return @staticmethod def check_times_are_in_future(loadedModel:dict, content:dict, existing_shipcall:object): """ Dates should be in the future. Depending on the ShipcallType, specific values should be checked Perfornms datetime checks in the loadedModel (datetime.datetime objects). """ # obtain the current datetime to check, whether the provided values are after ref time time_ref = datetime.datetime.now() - datetime.timedelta(days=1) type_ = loadedModel.get("type", ShipcallType.undefined.name) if isinstance(type_, str): # convert the name string to a ShipcallType data model type_ = ShipcallType[type_] # #TODO: *if* this is a PUT-request, one shall load the existing values from the database, overwrite the none-null # values *and then* perform the validation. # Example: eta and etd are set in the POST-request. User wants to execute a PUT-request with only the etd. # Internally, the backend must still verify, that eta < etd! # Same applies to tidal_window_from & tidal_window_to eta = loadedModel.get("eta") etd = loadedModel.get("etd") tidal_window_from = loadedModel.get("tidal_window_from", None) tidal_window_to = loadedModel.get("tidal_window_to", None) if existing_shipcall is not None: existing_eta = existing_shipcall.eta existing_etd = existing_shipcall.etd existing_tidal_window_from = existing_shipcall.tidal_window_from existing_tidal_window_to = existing_shipcall.tidal_window_to if eta != existing_eta or etd != existing_etd: # Estimated arrival or departure times InputValidationShipcall.check_times_in_future_based_on_type(type_, time_ref, eta, etd) if tidal_window_from != existing_tidal_window_from or tidal_window_to != existing_tidal_window_to: # Tidal Window InputValidationShipcall.check_tidal_window_in_future(type_, time_ref, tidal_window_from, tidal_window_to) return @staticmethod def check_times_in_future_based_on_type(type_, time_ref, eta, etd): """ checks, whether the ETA & ETD times are in the future. based on the type, this function checks: arrival: eta departure: etd shifting: eta & etd """ if (eta is None) and (etd is None): return time_in_a_year = datetime.datetime.now().replace(datetime.datetime.now().year + 1) if type_ is None: raise ValidationError({"type":f"when providing 'eta' or 'etd', one must provide the type of the shipcall, so the datetimes can be verified."}) if not isinstance(type_, (int, ShipcallType)): type_ = ShipcallType[type_] # #TODO: properly handle what happens, when eta or etd (or both) are None if int(type_)==int(ShipcallType.undefined): raise ValidationError({"type":f"providing 'type' is mandatory. Missing key!"}) elif int(type_)==int(ShipcallType.arrival): if eta is None: # null values -> no violation return if not eta > time_ref: raise ValidationError({"eta":f"'eta' is too far in the past. Incorrect datetime provided. Current Time: {time_ref}. ETA: {eta}."}) if etd is not None: raise ValidationError({"etd":f"'etd' should not be set when the shipcall type is 'arrival'."}) if eta > time_in_a_year: raise ValidationError({"eta":f"'eta' is more than a year in the future. ETA: {eta}."}) elif int(type_)==int(ShipcallType.departure): if etd is None: # null values -> no violation return if not etd > time_ref: raise ValidationError({"etd":f"'etd' is too far in the past. Incorrect datetime provided. Current Time: {time_ref}. ETD: {etd}."}) if eta is not None: raise ValidationError({"eta":f"'eta' should not be set when the shipcall type is 'departure'."}) if etd > time_in_a_year: raise ValidationError({"etd":f"'etd' is more than a year in the future. ETD: {etd}."}) elif int(type_)==int(ShipcallType.shifting): if (eta is None) and (etd is None): # null values -> no violation return if not ((eta is not None) and (etd is not None)): # for PUT-requests, a user could try modifying only 'eta' or only 'etd'. To simplify the # rules, a user is only allowed to provide *both* values. raise ValidationError({"eta_or_etd":f"For shifting shipcalls one should always provide, both, eta and etd."}) if (not eta > time_ref) or (not etd > time_ref): raise ValidationError({"eta_or_etd":f"'eta' and 'etd' is too far in the past. Incorrect datetime provided. Current Time: {time_ref}. ETA: {eta}. ETD: {etd}"}) if (not etd < eta): raise ValidationError({"eta_or_etd":f"The estimated time of departure ('etd') must take place *before the estimated time of arrival ('eta'). The ship cannot arrive, before it has departed. Found: ETD: {etd}, ETA: {eta}"}) if (eta is not None and etd is None) or (eta is None and etd is not None): raise ValidationError({"eta_or_etd":f"'eta' and 'etd' must both be provided when the shipcall type is 'shifting'."}) if eta > time_in_a_year: raise ValidationError({"eta":f"'eta' is more than a year in the future. ETA: {eta}."}) if etd > time_in_a_year: raise ValidationError({"etd":f"'etd' is more than a year in the future. ETD: {etd}."}) return @staticmethod def check_tidal_window_in_future(type_, time_ref, tidal_window_from, tidal_window_to): time_in_a_year = datetime.datetime.now().replace(datetime.datetime.now().year + 1) if tidal_window_to is not None: if not tidal_window_to >= time_ref: raise ValidationError({"tidal_window_to":f"'tidal_window_to' must be in the future. Incorrect datetime provided."}) if tidal_window_to > time_in_a_year: raise ValidationError({"tidal_window_to":f"'tidal_window_to' is more than a year in the future. Found: {tidal_window_to}."}) if tidal_window_from is not None: if not tidal_window_from >= time_ref: raise ValidationError({"tidal_window_from":f"'tidal_window_from' must be in the future. Incorrect datetime provided."}) if tidal_window_from > time_in_a_year: raise ValidationError({"tidal_window_from":f"'tidal_window_from' is more than a year in the future. Found: {tidal_window_from}."}) if (tidal_window_to is not None) and (tidal_window_from is not None): if tidal_window_to < tidal_window_from: raise ValidationError({"tidal_window_to_or_tidal_window_from":f"'tidal_window_to' must take place after 'tidal_window_from'. Incorrect datetime provided. Found 'tidal_window_to': {tidal_window_to}, 'tidal_window_from': {tidal_window_to}."}) if (tidal_window_to is not None and tidal_window_from is None) or (tidal_window_to is None and tidal_window_from is not None): raise ValidationError({"tidal_window_to_or_tidal_window_from":f"'tidal_window_to' and 'tidal_window_from' must both be provided."}) return @staticmethod def check_participant_list_not_empty_when_user_is_agency(loadedModel): """ For each POST request, one of the participants in the list must be assigned as a ParticipantType.AGENCY """ participants = loadedModel.get("participants", []) is_agency_participant = [ParticipantType.AGENCY in ParticipantType(participant.get("type")) for participant in participants] if not any(is_agency_participant): raise ValidationError({"participants":f"One of the assigned participants *must* be of type 'ParticipantType.AGENCY'. Found list of participants: {participants}"}) return @staticmethod def check_shipcall_is_canceled(loadedModel, content): # read the shipcall_id from the PUT data shipcall_id = loadedModel.get("id") # get all shipcalls in the database shipcalls = get_shipcall_id_dictionary() # search for the matching shipcall in the database shipcall = shipcalls.get(shipcall_id,{}) # if the *existing* shipcall in the database is canceled, it may not be changed if shipcall.get("canceled", False): raise ValidationError({"canceled":f"The shipcall with id 'shipcall_id' is canceled. A canceled shipcall may not be changed."}) return @staticmethod def check_required_fields_of_put_request(content:dict): shipcall_id = content.get("id", None) if shipcall_id is None: raise ValidationError({"id":f"A PUT request requires an 'id' to refer to."}) @staticmethod def check_shipcall_id_exists(loadedModel): """simply checks, whether the defined shipcall ID exists in the database. Otherwise, a PUT-request must fail.""" shipcall_id = loadedModel.get("id") if shipcall_id is None: raise ValidationError({"id":"a shipcall id must be provided"}) query = 'SELECT * FROM shipcall where (id = ?shipcall_id?)' shipcalls = execute_sql_query_standalone(query=query, model=Shipcall, param={"shipcall_id" : shipcall_id}) if len(shipcalls)==0: raise ValidationError({"id":f"unknown shipcall_id. There are no shipcalls with the ID {shipcall_id}"}) return @staticmethod def check_user_is_authorized_for_put_request(user_data:dict, loadedModel:dict, content:dict, shipcall_participant_map:typing.Optional[list[ShipcallParticipantMap]]=None): """ This method verifies, whether a user is authorized to create a PUT-request for shipcalls. To be authorized, a user should either a) belong to the ASSIGNED agency participant group b) belong to a BSMD participant, if the assigned agency has enabled the bit flag When there is not yet an assigned agency for the respective shipcall, only BSMD users are authorized. This mechanism prevents self-assignment of an agency to arbitrary shipcalls. """ ### preparation ### # use the decoded JWT token and extract the participant type & participant id user_participant_id = user_data.get("participant_id") participant_type = get_participant_type_from_user_data(user_data) user_is_bsmd = (ParticipantType.BSMD in participant_type) # get the shipcall id shipcall_id = loadedModel.get("id") ### AGENCY in SPM ### # determine, who is assigned as the agency for the shipcall if shipcall_participant_map is None: # query = 'SELECT * FROM shipcall_participant_map where (shipcall_id = ?shipcall_id? AND type=?participant_type?)' # assigned_agency = execute_sql_query_standalone(query=query, model=ShipcallParticipantMap, param={"shipcall_id" : shipcall_id, "participant_type":int(ParticipantType.AGENCY)}) assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY) an_agency_is_assigned = True if assigned_agency is not None else False else: # Agency assigned? User must belong to the assigned agency or be a BSMD user, in case the flag is set assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)] an_agency_is_assigned = len(assigned_agency)==1 if len(assigned_agency)>1: raise ValidationError({"internal_error":f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}"}) assigned_agency = assigned_agency[0] if an_agency_is_assigned: assert isinstance(assigned_agency, Participant), f"expecting the assigency agency to be a Participant object. Found: {type(assigned_agency)}" assert isinstance(assigned_agency.flags, int), f"this method has currently only been developed with 'flags' being set as an integer. Found: {type(assigned_agency.flags)}" # determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls agency_has_bsmd_flag = assigned_agency.flags==1 # once the flags are an IntFlag, change the boolean check to: (ParticipantFlag.BSMD in agency_participant.flags) ### USER authority ### # determine, whether the user is a) the assigned agency or b) a BSMD participant user_is_assigned_agency = (user_participant_id == assigned_agency.id) # when the BSMD flag is set: the user must be either BSMD or the assigned agency # when the BSMD flag is not set: the user must be the assigned agency user_is_authorized = (user_is_bsmd or user_is_assigned_agency) #if agency_has_bsmd_flag else user_is_assigned_agency if not user_is_authorized: raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {assigned_agency.flags}") # Forbidden: 403 else: # when there is no assigned agency, only BSMD users can update the shipcall if not user_is_bsmd: raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.") # part of a pytest.raises. Forbidden: 403 return