added InputValidationTimes for POST, PUT and DELETE requests. Created unit tests to check the individual functions. The input validation has been activated in BreCal.api.times. There is now a total of 188 unit tests, all passing. Created some utility functions and stub objects to ease the unit testing.
This commit is contained in:
parent
4aecb66408
commit
277e28c518
@ -1,10 +1,11 @@
|
||||
from flask import Blueprint, request
|
||||
from ..schemas import model
|
||||
from .. import impl
|
||||
from ..services.auth_guard import auth_guard
|
||||
from ..services.auth_guard import auth_guard, check_jwt
|
||||
import json
|
||||
import logging
|
||||
from marshmallow import ValidationError
|
||||
from BreCal.validators.input_validation_times import InputValidationTimes
|
||||
|
||||
bp = Blueprint('times', __name__)
|
||||
|
||||
@ -31,6 +32,12 @@ def PostTimes():
|
||||
# body = parser.parse(schema, request, location='json')
|
||||
loadedModel = model.TimesSchema().load(data=content, many=False, partial=True)
|
||||
|
||||
# read the user data from the JWT token (set when login is performed)
|
||||
user_data = check_jwt()
|
||||
|
||||
# validate the request
|
||||
InputValidationTimes.evaluate_post_data(user_data, loadedModel, content)
|
||||
|
||||
except ValidationError as ex:
|
||||
logging.error(ex)
|
||||
print(ex)
|
||||
@ -52,6 +59,12 @@ def PutTimes():
|
||||
content = request.get_json(force=True)
|
||||
loadedModel = model.TimesSchema().load(data=content, many=False, partial=True)
|
||||
|
||||
# read the user data from the JWT token (set when login is performed)
|
||||
user_data = check_jwt()
|
||||
|
||||
# validate the request
|
||||
InputValidationTimes.evaluate_put_data(user_data, loadedModel, content)
|
||||
|
||||
except ValidationError as ex:
|
||||
logging.error(ex)
|
||||
print(ex)
|
||||
@ -69,11 +82,16 @@ def PutTimes():
|
||||
@auth_guard() # no restriction by role
|
||||
def DeleteTimes():
|
||||
|
||||
# TODO check if I am allowd to delete this thing by deriving the participant from the bearer token
|
||||
|
||||
if 'id' in request.args:
|
||||
options = {}
|
||||
options["id"] = request.args.get("id")
|
||||
|
||||
# read the user data from the JWT token (set when login is performed)
|
||||
user_data = check_jwt()
|
||||
|
||||
# validate the request
|
||||
InputValidationTimes.evaluate_delete_data(user_data, times_id = request.args.get("id"))
|
||||
|
||||
return impl.times.DeleteTimes(options)
|
||||
else:
|
||||
logging.warning("Times delete missing id argument")
|
||||
|
||||
14
src/server/BreCal/database/sql_utils.py
Normal file
14
src/server/BreCal/database/sql_utils.py
Normal file
@ -0,0 +1,14 @@
|
||||
from BreCal.database.sql_handler import execute_sql_query_standalone
|
||||
import datetime
|
||||
|
||||
def get_user_data_for_id(user_id:int, expiration_time:int=90):
|
||||
"""debugging function, which is useful to pull user_data from the database, which may be used to create stub data and unit tests"""
|
||||
query = "SELECT * FROM user where id = ?id?"
|
||||
pdata = execute_sql_query_standalone(query=query, param={"id":user_id})
|
||||
pdata = pdata[0] if len(pdata)>0 else None
|
||||
assert pdata is not None, f"could not find user with id {user_id}"
|
||||
|
||||
user_data = {k:v for k,v in pdata.items() if k in ['id','participant_id','first_name','last_name','user_name','user_phone','user_email']}
|
||||
user_data["exp"] = (datetime.datetime.now()+datetime.timedelta(minutes=expiration_time)).timestamp()
|
||||
return user_data
|
||||
|
||||
@ -9,7 +9,7 @@ from typing import List
|
||||
|
||||
import json
|
||||
import datetime
|
||||
from BreCal.validators.time_logic import validate_time_exceeds_threshold
|
||||
from BreCal.validators.time_logic import validate_time_is_in_not_too_distant_future
|
||||
from BreCal.validators.validation_base_utils import check_if_string_has_special_characters
|
||||
from BreCal.database.enums import ParticipantType, ParticipantFlag
|
||||
|
||||
@ -354,10 +354,10 @@ class TimesSchema(Schema):
|
||||
zone_entry_fixed = fields.Bool(metadata={'required':False}, allow_none=True)
|
||||
operations_start = fields.DateTime(metadata={'required':False}, allow_none=True)
|
||||
operations_end = fields.DateTime(metadata={'required':False}, allow_none=True)
|
||||
remarks = fields.String(metadata={'required':False}, allow_none=True, validate=[validate.Length(max=256)])
|
||||
remarks = fields.String(metadata={'required':False}, allow_none=True, validate=[validate.Length(max=512)])
|
||||
participant_id = fields.Integer(metadata={'required':True})
|
||||
berth_id = fields.Integer(metadata={'required':False}, allow_none = True)
|
||||
berth_info = fields.String(metadata={'required':False}, allow_none=True, validate=[validate.Length(max=256)])
|
||||
berth_info = fields.String(metadata={'required':False}, allow_none=True, validate=[validate.Length(max=512)])
|
||||
pier_side = fields.Bool(metadata={'required':False}, allow_none = True)
|
||||
shipcall_id = fields.Integer(metadata={'required':True})
|
||||
participant_type = fields.Integer(Required = False, allow_none=True)# TODO: could become Enum. fields.Enum(ParticipantType, metadata={'required':False}, allow_none=True, default=ParticipantType.undefined) #fields.Integer(metadata={'required':False}, allow_none=True)
|
||||
@ -368,11 +368,60 @@ class TimesSchema(Schema):
|
||||
created = fields.DateTime(metadata={'required':False}, allow_none=True)
|
||||
modified = fields.DateTime(metadata={'required':False}, allow_none=True)
|
||||
|
||||
@validates("participant_type")
|
||||
def validate_participant_type(self, value):
|
||||
# #TODO: it may also make sense to block multi-assignments, whereas a value could be BSMD+AGENCY
|
||||
# while the validation fails when one of those multi-assignments is BSMD, it passes in cases,
|
||||
# such as AGENCY+PILOT
|
||||
|
||||
# a participant type should not be .BSMD
|
||||
if not isinstance(value, ParticipantType):
|
||||
value = ParticipantType(value)
|
||||
|
||||
if ParticipantType.BSMD in value:
|
||||
raise ValidationError(f"the participant_type must not be .BSMD")
|
||||
|
||||
@validates("eta_berth")
|
||||
def validate_eta_berth(self, value):
|
||||
threshold_exceeded = validate_time_exceeds_threshold(value, months=12)
|
||||
if threshold_exceeded:
|
||||
raise ValidationError(f"the provided time exceeds the twelve month threshold.")
|
||||
# violation when time is not in the future, but also does not exceed a threshold for the 'reasonable' future
|
||||
# when 'value' is 'None', a ValidationError is not issued.
|
||||
valid_time = validate_time_is_in_not_too_distant_future(raise_validation_error=True, value=value, months=12)
|
||||
return
|
||||
|
||||
@validates("etd_berth")
|
||||
def validate_etd_berth(self, value):
|
||||
# violation when time is not in the future, but also does not exceed a threshold for the 'reasonable' future
|
||||
# when 'value' is 'None', a ValidationError is not issued.
|
||||
valid_time = validate_time_is_in_not_too_distant_future(raise_validation_error=True, value=value, months=12)
|
||||
return
|
||||
|
||||
@validates("lock_time")
|
||||
def validate_lock_time(self, value):
|
||||
# violation when time is not in the future, but also does not exceed a threshold for the 'reasonable' future
|
||||
# when 'value' is 'None', a ValidationError is not issued.
|
||||
valid_time = validate_time_is_in_not_too_distant_future(raise_validation_error=True, value=value, months=12)
|
||||
return
|
||||
|
||||
@validates("zone_entry")
|
||||
def validate_zone_entry(self, value):
|
||||
# violation when time is not in the future, but also does not exceed a threshold for the 'reasonable' future
|
||||
# when 'value' is 'None', a ValidationError is not issued.
|
||||
valid_time = validate_time_is_in_not_too_distant_future(raise_validation_error=True, value=value, months=12)
|
||||
return
|
||||
|
||||
@validates("operations_start")
|
||||
def validate_operations_start(self, value):
|
||||
# violation when time is not in the future, but also does not exceed a threshold for the 'reasonable' future
|
||||
# when 'value' is 'None', a ValidationError is not issued.
|
||||
valid_time = validate_time_is_in_not_too_distant_future(raise_validation_error=True, value=value, months=12)
|
||||
return
|
||||
|
||||
@validates("operations_end")
|
||||
def validate_operations_end(self, value):
|
||||
# violation when time is not in the future, but also does not exceed a threshold for the 'reasonable' future
|
||||
# when 'value' is 'None', a ValidationError is not issued.
|
||||
valid_time = validate_time_is_in_not_too_distant_future(raise_validation_error=True, value=value, months=12)
|
||||
return
|
||||
|
||||
# deserialize PUT object target
|
||||
|
||||
|
||||
@ -26,7 +26,22 @@ def generate_jwt(payload, lifetime=None):
|
||||
return jwt.encode(payload, os.environ.get('SECRET_KEY'), algorithm="HS256")
|
||||
|
||||
def decode_jwt(token):
|
||||
"""this function reverts the {generate_jwt} function. An encoded JWT token is decoded into a JSON dictionary."""
|
||||
"""
|
||||
this function reverts the {generate_jwt} function. An encoded JWT token is decoded into a JSON dictionary.
|
||||
The function is commonly used to decode a login-token and obtain a 'user_data' variable, which is a dictionary.
|
||||
|
||||
Example of 'user_data':
|
||||
{
|
||||
'id': 1,
|
||||
'participant_id': 1,
|
||||
'first_name': 'Firstname',
|
||||
'last_name': 'Lastname',
|
||||
'user_name': 'xUsername01',
|
||||
'user_phone': '+01 123 456 7890',
|
||||
'user_email': 'firstname.lastname@internet.com',
|
||||
'exp': 1716881626.056438 # expiration timestamp
|
||||
}
|
||||
"""
|
||||
return jwt.decode(token, os.environ.get('SECRET_KEY'), algorithms=["HS256"])
|
||||
|
||||
|
||||
|
||||
@ -5,10 +5,12 @@ users will thereby be able to modify these values
|
||||
import datetime
|
||||
|
||||
from BreCal.stubs import generate_uuid1_int
|
||||
from BreCal.schemas.model import Times
|
||||
from BreCal.schemas.model import Times, ParticipantType
|
||||
from BreCal.database.sql_utils import get_user_data_for_id
|
||||
|
||||
|
||||
def get_times_full_simple():
|
||||
|
||||
def get_times_full_simple(return_dataclass=True):
|
||||
# only used for the stub
|
||||
base_time = datetime.datetime.now()
|
||||
|
||||
@ -49,32 +51,86 @@ def get_times_full_simple():
|
||||
created = datetime.datetime.now()
|
||||
modified = created+datetime.timedelta(seconds=10)
|
||||
|
||||
times = Times(
|
||||
id=times_id,
|
||||
eta_berth=eta_berth,
|
||||
eta_berth_fixed=eta_berth_fixed,
|
||||
etd_berth=etd_berth,
|
||||
etd_berth_fixed=etd_berth_fixed,
|
||||
lock_time=lock_time,
|
||||
lock_time_fixed=lock_time_fixed,
|
||||
zone_entry=zone_entry,
|
||||
zone_entry_fixed=zone_entry_fixed,
|
||||
operations_start=operations_start,
|
||||
operations_end=operations_end,
|
||||
remarks=remarks,
|
||||
participant_id=participant_id,
|
||||
berth_id=berth_id,
|
||||
berth_info=berth_info,
|
||||
pier_side=pier_side,
|
||||
participant_type=participant_type,
|
||||
shipcall_id=shipcall_id,
|
||||
ata=ata,
|
||||
atd=atd,
|
||||
eta_interval_end=eta_interval_end,
|
||||
etd_interval_end=etd_interval_end,
|
||||
created=created,
|
||||
modified=modified,
|
||||
)
|
||||
if return_dataclass:
|
||||
times = Times(
|
||||
id=times_id,
|
||||
eta_berth=eta_berth,
|
||||
eta_berth_fixed=eta_berth_fixed,
|
||||
etd_berth=etd_berth,
|
||||
etd_berth_fixed=etd_berth_fixed,
|
||||
lock_time=lock_time,
|
||||
lock_time_fixed=lock_time_fixed,
|
||||
zone_entry=zone_entry,
|
||||
zone_entry_fixed=zone_entry_fixed,
|
||||
operations_start=operations_start,
|
||||
operations_end=operations_end,
|
||||
remarks=remarks,
|
||||
participant_id=participant_id,
|
||||
berth_id=berth_id,
|
||||
berth_info=berth_info,
|
||||
pier_side=pier_side,
|
||||
participant_type=participant_type,
|
||||
shipcall_id=shipcall_id,
|
||||
ata=ata,
|
||||
atd=atd,
|
||||
eta_interval_end=eta_interval_end,
|
||||
etd_interval_end=etd_interval_end,
|
||||
created=created,
|
||||
modified=modified,
|
||||
)
|
||||
else:
|
||||
times = dict(
|
||||
id=times_id,
|
||||
eta_berth=eta_berth,
|
||||
eta_berth_fixed=eta_berth_fixed,
|
||||
etd_berth=etd_berth,
|
||||
etd_berth_fixed=etd_berth_fixed,
|
||||
lock_time=lock_time,
|
||||
lock_time_fixed=lock_time_fixed,
|
||||
zone_entry=zone_entry,
|
||||
zone_entry_fixed=zone_entry_fixed,
|
||||
operations_start=operations_start,
|
||||
operations_end=operations_end,
|
||||
remarks=remarks,
|
||||
participant_id=participant_id,
|
||||
berth_id=berth_id,
|
||||
berth_info=berth_info,
|
||||
pier_side=pier_side,
|
||||
participant_type=participant_type,
|
||||
shipcall_id=shipcall_id,
|
||||
ata=ata,
|
||||
atd=atd,
|
||||
eta_interval_end=eta_interval_end,
|
||||
etd_interval_end=etd_interval_end,
|
||||
created=created,
|
||||
modified=modified,)
|
||||
times = {k:v.isoformat() if isinstance(v, datetime.datetime) else v for k,v in times.items()}
|
||||
return times
|
||||
|
||||
def get_valid_stub_times():
|
||||
"""create a stub entry for a times dataset, which is valid"""
|
||||
times_entry = get_times_full_simple(return_dataclass=False)
|
||||
|
||||
times_entry.pop('id',None)
|
||||
times_entry["participant_id"] = 136
|
||||
times_entry["participant_type"] = int(ParticipantType.PILOT)
|
||||
times_entry["shipcall_id"] = 222
|
||||
times_entry["berth_id"] = 143
|
||||
times_entry["remarks"] = "stub entry."
|
||||
return times_entry
|
||||
|
||||
|
||||
def get_valid_stub_times_loaded_model(post_data=None):
|
||||
from BreCal.schemas import model
|
||||
if post_data is None:
|
||||
post_data = get_valid_stub_times()
|
||||
loadedModel = model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
return loadedModel
|
||||
|
||||
def get_valid_stub_for_pytests(user_id:int=3):
|
||||
user_data = get_user_data_for_id(user_id=user_id)
|
||||
post_data = get_valid_stub_times()
|
||||
|
||||
content = post_data
|
||||
loadedModel = get_valid_stub_times_loaded_model(post_data=post_data)
|
||||
return user_data, loadedModel, content
|
||||
|
||||
@ -82,7 +82,7 @@ class InputValidationShip():
|
||||
def check_user_is_bsmd_type(user_data:dict):
|
||||
is_bsmd = check_if_user_is_bsmd_type(user_data)
|
||||
if not is_bsmd:
|
||||
raise ValidationError(f"current user does not belong to BSMD. Cannot post shipcalls. Found user data: {user_data}")
|
||||
raise ValidationError(f"current user does not belong to BSMD. Cannot post, put or delete ships. Found user data: {user_data}")
|
||||
|
||||
@staticmethod
|
||||
def check_ship_imo_already_exists(loadedModel:dict):
|
||||
|
||||
@ -188,7 +188,7 @@ class InputValidationShipcall():
|
||||
"""
|
||||
is_bsmd = check_if_user_is_bsmd_type(user_data)
|
||||
if not is_bsmd:
|
||||
raise ValidationError(f"current user does not belong to BSMD. Cannot post shipcalls. Found user data: {user_data}")
|
||||
raise ValidationError(f"current user does not belong to BSMD. Cannot post or put shipcalls. Found user data: {user_data}")
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
|
||||
@ -12,11 +12,54 @@ from BreCal.impl.berths import GetBerths
|
||||
from BreCal.impl.times import GetTimes
|
||||
|
||||
from BreCal.database.enums import ParticipantType, ParticipantFlag
|
||||
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data
|
||||
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_shipcall_id_is_valid, get_shipcall_id_dictionary, get_participant_type_from_user_data, get_participant_id_dictionary, check_if_participant_id_is_valid_standalone
|
||||
from BreCal.database.sql_handler import execute_sql_query_standalone
|
||||
from BreCal.validators.validation_base_utils import check_if_int_is_valid_flag, check_if_string_has_special_characters
|
||||
import werkzeug
|
||||
|
||||
def build_post_data_type_dependent_required_fields_dict()->dict[ShipcallType,dict[ParticipantType,typing.Optional[list[str]]]]:
|
||||
"""
|
||||
The required fields of a POST-request depend on ShipcallType and ParticipantType. This function creates
|
||||
a dictionary, which maps those types to a list of required fields.
|
||||
|
||||
The participant types 'undefined' and 'bsmd' should not be used in POST-requests. They return 'None'.
|
||||
"""
|
||||
post_data_type_dependent_required_fields_dict = {
|
||||
ShipcallType.arrival:{
|
||||
ParticipantType.undefined:None, # should not be set in POST requests
|
||||
ParticipantType.BSMD:None, # should not be set in POST requests
|
||||
ParticipantType.TERMINAL:["operations_start"],
|
||||
ParticipantType.AGENCY:["eta_berth"],
|
||||
ParticipantType.MOORING:["eta_berth"],
|
||||
ParticipantType.PILOT:["eta_berth"],
|
||||
ParticipantType.PORT_ADMINISTRATION:["eta_berth"],
|
||||
ParticipantType.TUG:["eta_berth"],
|
||||
},
|
||||
ShipcallType.departure:{
|
||||
ParticipantType.undefined:None, # should not be set in POST requests
|
||||
ParticipantType.BSMD:None, # should not be set in POST requests
|
||||
ParticipantType.TERMINAL:["operations_end"],
|
||||
ParticipantType.AGENCY:["etd_berth"],
|
||||
ParticipantType.MOORING:["etd_berth"],
|
||||
ParticipantType.PILOT:["etd_berth"],
|
||||
ParticipantType.PORT_ADMINISTRATION:["etd_berth"],
|
||||
ParticipantType.TUG:["etd_berth"],
|
||||
},
|
||||
ShipcallType.shifting:{
|
||||
ParticipantType.undefined:None, # should not be set in POST requests
|
||||
ParticipantType.BSMD:None, # should not be set in POST requests
|
||||
ParticipantType.TERMINAL:["operations_start", "operations_end"],
|
||||
ParticipantType.AGENCY:["eta_berth", "etd_berth"],
|
||||
ParticipantType.MOORING:["eta_berth", "etd_berth"],
|
||||
ParticipantType.PILOT:["eta_berth", "etd_berth"],
|
||||
ParticipantType.PORT_ADMINISTRATION:["eta_berth", "etd_berth"],
|
||||
ParticipantType.TUG:["eta_berth", "etd_berth"],
|
||||
},
|
||||
}
|
||||
return post_data_type_dependent_required_fields_dict
|
||||
|
||||
|
||||
|
||||
class InputValidationTimes():
|
||||
"""
|
||||
This class combines a complex set of individual input validation functions into a joint object.
|
||||
@ -32,61 +75,328 @@ class InputValidationTimes():
|
||||
|
||||
@staticmethod
|
||||
def evaluate_post_data(user_data:dict, loadedModel:dict, content:dict):
|
||||
raise NotImplementedError("skeleton")
|
||||
# 0.) Check for the presence of required fields
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# 1.) datasets may only be created, if the current user fits the appropriate type in the ShipcallParticipantMap
|
||||
InputValidationTimes.check_if_user_fits_shipcall_participant_map(user_data, loadedModel, content)
|
||||
|
||||
# 2.) datasets may only be created, if the respective participant type did not already create one.
|
||||
InputValidationTimes.check_if_entry_already_exists_for_participant_type(user_data, loadedModel, content)
|
||||
|
||||
# 3.) only users who are *not* of type BSMD may post times datasets.
|
||||
InputValidationTimes.check_user_is_not_bsmd_type(user_data)
|
||||
|
||||
# 4.) Reference checking
|
||||
InputValidationTimes.check_dataset_references(content)
|
||||
|
||||
# 5.) Value checking
|
||||
InputValidationTimes.check_dataset_values(user_data, loadedModel, content)
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def evaluate_put_data(user_data:dict, loadedModel:dict, content:dict):
|
||||
raise NotImplementedError("skeleton")
|
||||
# 1.) Only users of the same participant_id, which the times dataset refers to, can delete the entry
|
||||
# (same as for .evaluate_delete_data)
|
||||
# 1.) Check for the presence of required fields
|
||||
InputValidationTimes.check_times_required_fields_put_data(content)
|
||||
|
||||
# 2.) Reference checking
|
||||
# (same as for .evaluate_post_data)
|
||||
# 2.) Only users of the same participant_id, which the times dataset refers to, can update the entry
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=loadedModel, times_id=None)
|
||||
|
||||
# 3.) Value checking
|
||||
# (same as for .evaluate_post_data)
|
||||
# participant type should be dynamically checked for POST / PUT. All other values can be validated within the Schema
|
||||
# 3.) Reference checking
|
||||
InputValidationTimes.check_dataset_references(content)
|
||||
|
||||
# 4.) Value checking
|
||||
InputValidationTimes.check_dataset_values(user_data, loadedModel, content)
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def evaluate_delete_data(user_data:dict, times_id:int):
|
||||
raise NotImplementedError("skeleton")
|
||||
# 1.) Only users of the same participant_id, which the times dataset refers to, can delete the entry
|
||||
# (same as for .evaluate_put_data)
|
||||
# #TODO_determine: is times_id always an int or does the request.args call provide a string?
|
||||
times_id = int(times_id) if not isinstance(times_id, int) else times_id
|
||||
|
||||
# 2.) The dataset entry may not be deleted already
|
||||
shipcall_id is not Defined
|
||||
InputValidationTimes.check_if_entry_is_already_deleted(times_id, shipcall_id)
|
||||
# 1.) The dataset entry may not be deleted already
|
||||
InputValidationTimes.check_if_entry_is_already_deleted(times_id)
|
||||
|
||||
# 2.) Only users of the same participant_id, which the times dataset refers to, can delete the entry
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=None, times_id=times_id)
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def check_if_entry_is_already_deleted(times_id:int, shipcall_id:int):
|
||||
def check_if_entry_is_already_deleted(times_id:int):
|
||||
"""
|
||||
When calling a delete request for ships, the dataset may not be deleted already. This method
|
||||
makes sure, that the request contains and ID, has a matching entry in the database, and the
|
||||
database entry may not have a deletion state already.
|
||||
When calling a delete request for times, the dataset may not be deleted already. This method
|
||||
makes sure, that the request contains and ID, has a matching entry in the database.
|
||||
When a times dataset is deleted, it is directly removed from the database.
|
||||
|
||||
To identify deleted entries, query from the database and check, whether there is a match for the times id.
|
||||
|
||||
"""
|
||||
raise NotImplementedError("skeleton. fully untested pseudo-code.")
|
||||
if times_id is None:
|
||||
raise ValidationError(f"The times_id must be provided.")
|
||||
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
|
||||
query = "SELECT shipcall_id FROM times WHERE id = ?id?"
|
||||
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
|
||||
|
||||
# options["shipcall_id"]
|
||||
assert 'shipcall_id' in options.keys()
|
||||
if len(pdata)==0:
|
||||
raise ValidationError(f"The selected time entry is already deleted. ID: {times_id}")
|
||||
return
|
||||
|
||||
response, status_code, header = GetTimes(options)
|
||||
times = json.loads(response)
|
||||
existing_database_entries = [time_ for time_ in times if time_.get("id")==times_id]
|
||||
if len(existing_database_entries)==0:
|
||||
raise ValidationError(f"Could not find a times entry with the specified ID. Selected: {times_id}")
|
||||
@staticmethod
|
||||
def check_user_is_not_bsmd_type(user_data:dict):
|
||||
"""a new dataset may only be created by a user who is *not* belonging to participant group BSMD"""
|
||||
is_bsmd = check_if_user_is_bsmd_type(user_data)
|
||||
if is_bsmd:
|
||||
raise ValidationError(f"current user belongs to BSMD. Cannot post 'times' datasets. Found user data: {user_data}")
|
||||
return
|
||||
|
||||
existing_database_entry = existing_database_entries[0]
|
||||
@staticmethod
|
||||
def check_dataset_values(user_data:dict, loadedModel:dict, content:dict):
|
||||
"""
|
||||
this method validates POST and PUT data. Most of the dataset arguments are validated directly in the
|
||||
BreCal.schemas.model.TimesSchema, using @validates. This is exclusive for 'simple' validation rules.
|
||||
|
||||
deletion_state = existing_database_entry.get("deleted",None)
|
||||
if deletion_state:
|
||||
raise ValidationError(f"The selected time entry is already deleted.")
|
||||
This applies to:
|
||||
"remarks" & "berth_info"
|
||||
"eta_berth", "etd_berth", "lock_time", "zone_entry", "operations_start", "operations_end"
|
||||
"""
|
||||
# while InputValidationTimes.check_user_is_not_bsmd_type already validates a user, this method
|
||||
# validates the times dataset.
|
||||
|
||||
# ensure loadedModel["participant_type"] is of type ParticipantType
|
||||
if not isinstance(loadedModel["participant_type"], ParticipantType):
|
||||
loadedModel["participant_type"] = ParticipantType(loadedModel["participant_type"])
|
||||
|
||||
if ParticipantType.BSMD in loadedModel["participant_type"]:
|
||||
raise ValidationError(f"current user belongs to BSMD. Cannot post times datasets. Found user data: {user_data}")
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def check_dataset_references(content:dict):
|
||||
"""
|
||||
When IDs are referenced, they must exist in the database. This method individually validates the existance of referred
|
||||
berth ID, participant IDs and shipcall ID.
|
||||
|
||||
Note: whenever an ID is 'None', there is no exception, because a different method is supposed to capture non-existant mandatory fields.
|
||||
"""
|
||||
# extract the IDs
|
||||
berth_id, participant_id, shipcall_id = content.get("berth_id"), content.get("participant_id"), content.get("shipcall_id")
|
||||
|
||||
valid_berth_id_reference = check_if_berth_id_is_valid(berth_id)
|
||||
if not valid_berth_id_reference:
|
||||
raise ValidationError(f"The referenced berth_id '{berth_id}' does not exist in the database.")
|
||||
|
||||
valid_shipcall_id_reference = check_if_shipcall_id_is_valid(shipcall_id)
|
||||
if not valid_shipcall_id_reference:
|
||||
raise ValidationError(f"The referenced shipcall_id '{shipcall_id}' does not exist in the database.")
|
||||
|
||||
valid_participant_id_reference = check_if_participant_id_is_valid_standalone(participant_id)
|
||||
if not valid_participant_id_reference:
|
||||
raise ValidationError(f"The referenced participant_id '{participant_id}' does not exist in the database.")
|
||||
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def check_times_required_fields_post_data(loadedModel:dict, content:dict):
|
||||
"""
|
||||
Depending on ShipcallType and ParticipantType, there is a rather complex set of required fields.
|
||||
Independent of those types, any POST request for times should always include the default fields.
|
||||
|
||||
The dependent and independent fields are validated by checking, whether the respective value in 'content'
|
||||
is undefined (returns None). When any of these fields is undefined, a ValidationError is raised.
|
||||
"""
|
||||
participant_type = loadedModel["participant_type"]
|
||||
shipcall_id = loadedModel["shipcall_id"]
|
||||
|
||||
# build a dictionary of id:item pairs, so one can select the respective participant
|
||||
# must look-up the shipcall_type based on the shipcall_id
|
||||
shipcalls = get_shipcall_id_dictionary()
|
||||
shipcall_type = ShipcallType[shipcalls.get(shipcall_id,{}).get("type",ShipcallType.undefined.name)]
|
||||
|
||||
if (participant_type is None) or (int(shipcall_type) == int(ShipcallType.undefined)):
|
||||
raise ValidationError(f"At least one of the required fields is missing. Missing: 'participant_type' or 'shipcall_type'")
|
||||
|
||||
|
||||
# build a list of required fields based on shipcall and participant type, as well as type-independent fields
|
||||
independent_required_fields = InputValidationTimes.get_post_data_type_independent_fields()
|
||||
dependent_required_fields = InputValidationTimes.get_post_data_type_dependent_fields(shipcall_type, participant_type)
|
||||
|
||||
required_fields = independent_required_fields + dependent_required_fields
|
||||
|
||||
# generate a list of booleans, where each element shows, whether one of the required fields is missing.
|
||||
missing_required_fields = [
|
||||
content.get(field,None) is None for field in required_fields
|
||||
]
|
||||
|
||||
if any(missing_required_fields):
|
||||
# create a tuple of (field_key, bool) to describe to a user, which one of the fields may be missing
|
||||
verbosity_tuple = [(field, missing) for field, missing in zip(required_fields, missing_required_fields) if missing]
|
||||
raise ValidationError(f"At least one of the required fields is missing. Missing: {verbosity_tuple}")
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def check_times_required_fields_put_data(content:dict):
|
||||
"""in a PUT request, only the 'id' is a required field. All other fields are simply ignored, when they are not provided."""
|
||||
if content.get("id") is None:
|
||||
raise ValidationError(f"A PUT-request requires an 'id' reference, which was not found.")
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def get_post_data_type_independent_fields()->list[str]:
|
||||
"""
|
||||
Independent of the ShipcallType and ParticipantType, any POST request for times should always include the default fields.
|
||||
"""
|
||||
independent_required_fields = [
|
||||
"shipcall_id", "participant_id", "participant_type"
|
||||
]
|
||||
return independent_required_fields
|
||||
|
||||
@staticmethod
|
||||
def get_post_data_type_dependent_fields(shipcall_type:typing.Union[int, ShipcallType], participant_type:typing.Union[int, ParticipantType]):
|
||||
"""
|
||||
Depending on ShipcallType and ParticipantType, there is a rather complex set of required fields.
|
||||
|
||||
Arriving shipcalls need arrival times (e.g., 'eta'), Departing shipcalls need departure times (e.g., 'etd') and
|
||||
Shifting shipcalls need both times (e.g., 'eta' and 'etd').
|
||||
|
||||
Further, the ParticipantType determines the set of relevant times. In particular, the terminal uses
|
||||
'operations_start' and 'operations_end', while other users use 'eta_berth' or 'etd_berth'.
|
||||
"""
|
||||
# ensure that both, shipcall_type and participant_type, refer to the enumerators, as opposed to integers.
|
||||
if not isinstance(shipcall_type, ShipcallType):
|
||||
shipcall_type = ShipcallType(shipcall_type)
|
||||
if not isinstance(participant_type, ParticipantType):
|
||||
participant_type = ParticipantType(participant_type)
|
||||
|
||||
# build a dictionary, which maps shipcall type and participant type to a list of fields
|
||||
dependent_required_fields_dict = build_post_data_type_dependent_required_fields_dict()
|
||||
|
||||
# select shipcall type & participant type
|
||||
dependent_required_fields = dependent_required_fields_dict.get(shipcall_type,{}).get(participant_type,None)
|
||||
return dependent_required_fields
|
||||
|
||||
@staticmethod
|
||||
def check_if_user_fits_shipcall_participant_map(user_data:dict, loadedModel:dict, content:dict, spm_shipcall_data=None):
|
||||
"""
|
||||
a new dataset may only be created, if the user belongs to the participant group (participant_id),
|
||||
which is assigned to the shipcall within the ShipcallParticipantMap
|
||||
|
||||
This method does not validate, what the POST-request contains, but it validates, whether the *user* is
|
||||
authorized to send the request.
|
||||
|
||||
options:
|
||||
spm_shipcall_data:
|
||||
data from the ShipcallParticipantMap, which refers to the respective shipcall ID. The SPM can be
|
||||
an optional argument to allow for much easier unit testing.
|
||||
"""
|
||||
|
||||
# identify shipcall_id
|
||||
shipcall_id = loadedModel["shipcall_id"]
|
||||
|
||||
# identify user's participant_id & type (get all participants; then filter these for the {participant_id})
|
||||
participant_id = user_data["participant_id"] #participants = get_participant_id_dictionary() #participant_type = ParticipantType(participants.get(participant_id,{}).get("type"))
|
||||
participant_type = ParticipantType(loadedModel["participant_type"]) if not isinstance(loadedModel["participant_type"],ParticipantType) else loadedModel["participant_type"]
|
||||
|
||||
# get ShipcallParticipantMap for the shipcall_id
|
||||
if spm_shipcall_data is None:
|
||||
# read the ShipcallParticipantMap entry of the current shipcall_id. This is used within the input validation of a PUT request
|
||||
# creates a list of {'participant_id: ..., 'type': ...} elements
|
||||
spm_shipcall_data = execute_sql_query_standalone(
|
||||
query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?",
|
||||
param={"shipcall_id":shipcall_id},
|
||||
pooledConnection=None
|
||||
)
|
||||
|
||||
# check, if participant_id is assigned to the ShipcallParticipantMap
|
||||
matching_spm = [
|
||||
spm
|
||||
for spm in spm_shipcall_data
|
||||
if spm.get("participant_id")==participant_id
|
||||
]
|
||||
|
||||
if not len(matching_spm)>0:
|
||||
raise ValidationError(f'The participant group with id {participant_id} is not assigned to the shipcall. Found ShipcallParticipantMap: {spm_shipcall_data}')
|
||||
|
||||
# check, if the assigned participant_id is assigned with the same role
|
||||
matching_spm_element = matching_spm[0]
|
||||
matching_spm_element_participant_type = ParticipantType(matching_spm_element.get("type"))
|
||||
if not matching_spm_element_participant_type in participant_type:
|
||||
raise ValidationError(f'The participant group with id {participant_id} is assigned to the shipcall in a different role. Request Role: {participant_type}, ShipcallParticipantMap Role Assignment: {matching_spm_element_participant_type}')
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def check_if_entry_already_exists_for_participant_type(user_data:dict, loadedModel:dict, content:dict):
|
||||
"""determines, whether a dataset for the participant type is already present"""
|
||||
# determine participant_type and shipcall_id from the loadedModel
|
||||
participant_type = loadedModel["participant_type"]
|
||||
if not isinstance(participant_type, ParticipantType): # ensure the correct data type
|
||||
participant_type = ParticipantType(participant_type)
|
||||
shipcall_id = loadedModel["shipcall_id"]
|
||||
|
||||
# get all times entries of the shipcall_id from the database
|
||||
times, status_code, headers = GetTimes(options={"shipcall_id":shipcall_id})
|
||||
times = json.loads(times)
|
||||
|
||||
# check, if there is already a dataset for the participant type
|
||||
participant_type_exists_already = any([ParticipantType(time_.get("participant_type",0)) in participant_type for time_ in times])
|
||||
if participant_type_exists_already:
|
||||
raise ValidationError(f"A dataset for the participant type is already present. Participant Type: {participant_type}. Times Datasets: {times}")
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def check_user_belongs_to_same_group_as_dataset_determines(user_data:dict, loadedModel:typing.Optional[dict]=None, times_id:typing.Optional[int]=None):
|
||||
"""
|
||||
This method checks, whether a user belongs to the same participant_id, as the dataset entry refers to.
|
||||
It is used in, both, PUT requests and DELETE requests, but uses different arguments to determine the matching
|
||||
time dataset entry.
|
||||
|
||||
PUT:
|
||||
loadedModel is unbundled to identify the matching times entry by the shipcall id
|
||||
|
||||
DELETE:
|
||||
times_id is used to directly identify the matching times entry
|
||||
"""
|
||||
assert not ((loadedModel is None) and (times_id is None)), f"must provide either loadedModel OR times_id. Both are 'None'"
|
||||
assert (loadedModel is None) or (times_id is None), f"must provide either loadedModel OR times_id. Both are defined."
|
||||
|
||||
# identify the user's participant id
|
||||
user_participant_id = user_data["participant_id"]
|
||||
|
||||
if loadedModel is not None:
|
||||
shipcall_id = loadedModel["shipcall_id"]
|
||||
participant_type = loadedModel["participant_type"]
|
||||
|
||||
# get all times entries of the shipcall_id from the database as a list of {'participant_id':..., 'participant_type':...} elements
|
||||
query = "SELECT participant_id, participant_type FROM times WHERE shipcall_id = ?shipcall_id?"
|
||||
times = execute_sql_query_standalone(query=query, param={"shipcall_id":shipcall_id}, pooledConnection=None)
|
||||
|
||||
# get the matching datasets, where the participant id is identical
|
||||
time_datasets_of_participant_type = [time_ for time_ in times if time_.get("participant_type")==participant_type]
|
||||
|
||||
# when there are no matching participants, raise a ValidationError
|
||||
if not len(time_datasets_of_participant_type)>0:
|
||||
raise ValidationError(f"Could not find a matching time dataset for the provided participant_type: {participant_type}. Found Time Datasets: {times}")
|
||||
|
||||
# take the first match. There should always be only one match.
|
||||
time_datasets_of_participant_type = time_datasets_of_participant_type[0]
|
||||
participant_id_of_times_dataset = time_datasets_of_participant_type.get("participant_id")
|
||||
|
||||
if times_id is not None:
|
||||
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
|
||||
query = "SELECT participant_id FROM times WHERE id = ?id?"
|
||||
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
|
||||
|
||||
# extracts the participant_id from the first matching entry, if applicable
|
||||
if not len(pdata)>0:
|
||||
# this case is usually covered by the InputValidationTimes.check_if_entry_is_already_deleted method already
|
||||
raise ValidationError(f"Unknown times_id. Could not find a matching entry for ID: {times_id}")
|
||||
else:
|
||||
participant_id_of_times_dataset = pdata[0].get("participant_id")
|
||||
|
||||
if user_participant_id != participant_id_of_times_dataset:
|
||||
raise ValidationError(f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id: {user_participant_id}; Dataset participant_id: {participant_id_of_times_dataset}")
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -105,6 +105,30 @@ def check_if_berth_id_is_valid(berth_id):
|
||||
berth_id_is_valid = berth_id in list(berths.keys())
|
||||
return berth_id_is_valid
|
||||
|
||||
def check_if_shipcall_id_is_valid(shipcall_id:int):
|
||||
"""check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a request, may not have to include all IDs at once"""
|
||||
if shipcall_id is None:
|
||||
return True
|
||||
|
||||
# build a dictionary of id:item pairs, so one can select the respective participant
|
||||
shipcalls = get_shipcall_id_dictionary()
|
||||
|
||||
# boolean check
|
||||
shipcall_id_is_valid = shipcall_id in list(shipcalls.keys())
|
||||
return shipcall_id_is_valid
|
||||
|
||||
def check_if_participant_id_is_valid_standalone(participant_id:int):
|
||||
"""check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a request, may not have to include all IDs at once"""
|
||||
if participant_id is None:
|
||||
return True
|
||||
|
||||
# build a dictionary of id:item pairs, so one can select the respective participant
|
||||
participants = get_participant_id_dictionary()
|
||||
|
||||
# boolean check
|
||||
participant_id_is_valid = participant_id in list(participants.keys())
|
||||
return participant_id_is_valid
|
||||
|
||||
def check_if_participant_id_is_valid(participant:dict):
|
||||
"""
|
||||
check, whether the provided ID is valid. If it is 'None', it will be considered valid. This is, because a shipcall POST-request, does not have to include all IDs at once
|
||||
@ -115,15 +139,7 @@ def check_if_participant_id_is_valid(participant:dict):
|
||||
"""
|
||||
# #TODO1: Daniel Schick: 'types may only appear once and must not include type "BSMD"'
|
||||
participant_id = participant.get("participant_id", None)
|
||||
|
||||
if participant_id is None:
|
||||
return True
|
||||
|
||||
# build a dictionary of id:item pairs, so one can select the respective participant
|
||||
participants = get_participant_id_dictionary()
|
||||
|
||||
# boolean check
|
||||
participant_id_is_valid = participant_id in list(participants.keys())
|
||||
participant_id_is_valid = check_if_participant_id_is_valid_standalone(participant_id)
|
||||
return participant_id_is_valid
|
||||
|
||||
def check_if_participant_ids_are_valid(participants:list[dict]):
|
||||
@ -135,6 +151,10 @@ def check_if_participant_ids_are_valid(participants:list[dict]):
|
||||
'participant_id' : int
|
||||
'type' : ParticipantType
|
||||
"""
|
||||
# empty list -> invalid
|
||||
if participants is None:
|
||||
return False
|
||||
|
||||
# check each participant id individually
|
||||
valid_participant_ids = [check_if_participant_id_is_valid(participant) for participant in participants]
|
||||
|
||||
|
||||
@ -2,6 +2,8 @@ import datetime
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
|
||||
from marshmallow import ValidationError
|
||||
|
||||
def validate_time_exceeds_threshold(value:datetime.datetime, seconds:int=60, minutes:int=60, hours:int=24, days:int=30, months:int=12)->bool:
|
||||
"""returns a boolean when the input value is very distant in the future. The parameters provide the threshold"""
|
||||
# time difference in seconds. Positive: in the future, Negative: in the past
|
||||
@ -10,6 +12,37 @@ def validate_time_exceeds_threshold(value:datetime.datetime, seconds:int=60, min
|
||||
threshold = seconds*minutes*hours*days*months
|
||||
return time_>=threshold
|
||||
|
||||
def validate_time_is_in_future(value:datetime.datetime):
|
||||
"""returns a boolean when the input value is in the future."""
|
||||
current_time = datetime.datetime.now()
|
||||
return value >= current_time
|
||||
|
||||
def validate_time_is_in_not_too_distant_future(raise_validation_error:bool, value:datetime.datetime, seconds:int=60, minutes:int=60, hours:int=24, days:int=30, months:int=12)->bool:
|
||||
"""
|
||||
combines two boolean operations. Returns True when both conditions are met.
|
||||
a) value is in the future
|
||||
b) value is not too distant (e.g., at max. 1 year in the future)
|
||||
|
||||
When the value is 'None', the validation will be skipped. A ValidationError is never issued, but the method returns 'False'.
|
||||
|
||||
options:
|
||||
raise_validation_error: boolean. If set to True, this method issues a marshmallow.ValidationError, when the conditions fail.
|
||||
"""
|
||||
if value is None:
|
||||
return False
|
||||
|
||||
is_in_future = validate_time_is_in_future(value)
|
||||
is_too_distant = validate_time_exceeds_threshold(value, seconds, minutes, hours, days, months)
|
||||
|
||||
if raise_validation_error:
|
||||
if not is_in_future:
|
||||
raise ValidationError(f"The provided value must be in the future. Current Time: {datetime.datetime.now()}, Value: {value}")
|
||||
|
||||
if is_too_distant:
|
||||
raise ValidationError(f"The provided value is in the too distant future and exceeds a threshold for 'reasonable' entries. Found: {value}")
|
||||
|
||||
return is_in_future & (not is_too_distant)
|
||||
|
||||
class TimeLogic():
|
||||
def __init__(self):
|
||||
return
|
||||
|
||||
@ -45,15 +45,6 @@ def get_stub_token():
|
||||
token = user.get("token")
|
||||
return locals()
|
||||
|
||||
def test_():
|
||||
ivs = InputValidationShip()
|
||||
return
|
||||
|
||||
# length: 0 < value < 1000
|
||||
# width: 0 < value < 100
|
||||
|
||||
|
||||
|
||||
def test_input_validation_ship_fails_when_length_is_incorrect():
|
||||
with pytest.raises(ValidationError, match=re.escape("Must be greater than 0 and less than 1000.")):
|
||||
post_data = get_stub_valid_ship()
|
||||
|
||||
398
src/server/tests/validators/test_input_validation_times.py
Normal file
398
src/server/tests/validators/test_input_validation_times.py
Normal file
@ -0,0 +1,398 @@
|
||||
import pytest
|
||||
|
||||
import os
|
||||
import random
|
||||
import datetime
|
||||
from marshmallow import ValidationError
|
||||
|
||||
from BreCal import local_db
|
||||
from BreCal.schemas import model
|
||||
|
||||
from BreCal.schemas.model import ParticipantType
|
||||
from BreCal.validators.input_validation_times import InputValidationTimes
|
||||
|
||||
from BreCal.stubs.times_full import get_valid_stub_times, get_valid_stub_for_pytests
|
||||
|
||||
instance_path = os.path.join(os.path.expanduser('~'), "brecal", "src", "server", "instance", "instance")
|
||||
local_db.initPool(os.path.dirname(instance_path), connection_filename="connection_data_local.json")
|
||||
|
||||
|
||||
def test_input_validation_times_fails_when_berth_info_exceeds_length_limit():
|
||||
# success
|
||||
post_data = get_valid_stub_times()
|
||||
post_data["berth_info"] = "a"*512 # 512 characters
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
post_data["berth_info"] = "" # 0 characters
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
# failure
|
||||
with pytest.raises(ValidationError, match="Longer than maximum length 512."):
|
||||
post_data["berth_info"] = "a"*513 # 513 characters
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_remarks_exceeds_length_limit():
|
||||
# success
|
||||
post_data = get_valid_stub_times()
|
||||
post_data["remarks"] = "a"*512 # 512 characters
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
post_data["remarks"] = "" # 0 characters
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
# failure
|
||||
with pytest.raises(ValidationError, match="Longer than maximum length 512."):
|
||||
post_data["remarks"] = "a"*513 # 513 characters
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_participant_type_is_bsmd():
|
||||
# BSMD -> Failure
|
||||
post_data = get_valid_stub_times()
|
||||
post_data["participant_type"] = int(ParticipantType.BSMD)
|
||||
with pytest.raises(ValidationError, match="the participant_type must not be .BSMD"):
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
# IntFlag property: BSMD & AGENCY -> Failure
|
||||
post_data = get_valid_stub_times()
|
||||
post_data["participant_type"] = int(ParticipantType(ParticipantType.BSMD+ParticipantType.AGENCY))
|
||||
with pytest.raises(ValidationError, match="the participant_type must not be .BSMD"):
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_time_key_is_not_reasonable():
|
||||
"""
|
||||
every time key (e.g., 'eta_berth' or 'zone_entry') must be reasonable. The validation expects
|
||||
these values to be 'in the future' (larger than datetime.datetime.now()) and not 'in the too distant future'
|
||||
(e.g., more than one year from now.)
|
||||
"""
|
||||
for time_key in ["eta_berth", "etd_berth", "lock_time", "zone_entry", "operations_start", "operations_end"]:
|
||||
post_data = get_valid_stub_times()
|
||||
|
||||
# success
|
||||
post_data[time_key] = (datetime.datetime.now() + datetime.timedelta(minutes=11)).isoformat()
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
# fails
|
||||
with pytest.raises(ValidationError, match="The provided value must be in the future."):
|
||||
post_data[time_key] = (datetime.datetime.now() - datetime.timedelta(minutes=11)).isoformat()
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
|
||||
# fails
|
||||
with pytest.raises(ValidationError, match="The provided value is in the too distant future and exceeds a threshold for 'reasonable' entries."):
|
||||
post_data[time_key] = (datetime.datetime.now() + datetime.timedelta(days=367)).isoformat()
|
||||
model.TimesSchema().load(data=post_data, many=False, partial=True)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_user_is_bsmd_user():
|
||||
# create stub-data for a POST request
|
||||
from BreCal.services.jwt_handler import decode_jwt
|
||||
from BreCal.database.sql_utils import get_user_data_for_id
|
||||
import re
|
||||
|
||||
# user 4 is a BSMD user -> fails
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=4)
|
||||
|
||||
with pytest.raises(ValidationError, match=re.escape("current user belongs to BSMD. Cannot post 'times' datasets.")):
|
||||
InputValidationTimes.check_user_is_not_bsmd_type(user_data)
|
||||
|
||||
# user 13 is not a BSMD user -> passes
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=13)
|
||||
|
||||
# success
|
||||
InputValidationTimes.check_user_is_not_bsmd_type(user_data)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_participant_type_entry_already_exists():
|
||||
# the participant type already has an entry -> fails
|
||||
with pytest.raises(ValidationError, match="A dataset for the participant type is already present. Participant Type:"):
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["participant_type"] = int(ParticipantType.AGENCY)
|
||||
|
||||
# 2.) datasets may only be created, if the respective participant type did not already create one.
|
||||
InputValidationTimes.check_if_entry_already_exists_for_participant_type(user_data, loadedModel, content)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_participant_type_deviates_from_shipcall_participant_map():
|
||||
# success
|
||||
# user id 3 is assigned as participant_type=4, but the stub assigns participant_type=4
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
InputValidationTimes.check_if_user_fits_shipcall_participant_map(user_data, loadedModel, content)
|
||||
|
||||
# fails
|
||||
# user id 4 is assigned as participant_type=1, but the stub assigns participant_type=4
|
||||
with pytest.raises(ValidationError, match="is assigned to the shipcall in a different role."):
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=4)
|
||||
InputValidationTimes.check_if_user_fits_shipcall_participant_map(user_data, loadedModel, content)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_id_references_do_not_exist():
|
||||
# success: all IDs exist
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
InputValidationTimes.check_dataset_references(content)
|
||||
|
||||
# fails: IDs do not exist
|
||||
# iterates once for each, berth_id, shipcall_id, participant_id and generates an artificial, non-existing ID
|
||||
for key in ["berth_id", "shipcall_id", "participant_id"]:
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
content[key] = loadedModel[key] = 9912737
|
||||
|
||||
with pytest.raises(ValidationError, match=f"The referenced {key} '{content[key]}' does not exist in the database."):
|
||||
InputValidationTimes.check_dataset_references(content)
|
||||
return
|
||||
|
||||
from BreCal.schemas.model import ParticipantType
|
||||
|
||||
|
||||
def test_input_validation_times_fails_when_missing_required_fields_arrival():
|
||||
"""
|
||||
evaluates every individual combination of arriving shipcalls, where one of the required values is arbitrarily missing
|
||||
randomly selects one of the non-terminal ParticipantTypes, which are reasonable (not .BSMD), and validates. This makes sure,
|
||||
that over time, every possible combination has been tested.
|
||||
"""
|
||||
# arrival + not-terminal
|
||||
non_terminal_list = [ParticipantType.AGENCY, ParticipantType.MOORING, ParticipantType.PILOT, ParticipantType.PORT_ADMINISTRATION, ParticipantType.TUG]
|
||||
for key in ["eta_berth"]+InputValidationTimes.get_post_data_type_independent_fields():
|
||||
random_participant_type_for_unit_test = random.sample(non_terminal_list,k=1)[0]
|
||||
|
||||
# pass: all required fields exist for the current shipcall type (arrival/incoming)
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
loadedModel["participant_type"] = random_participant_type_for_unit_test
|
||||
content["participant_type"] = int(random_participant_type_for_unit_test)
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# fails: iteratively creates stubs, where one of the required keys is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
loadedModel["participant_type"] = random_participant_type_for_unit_test
|
||||
content["participant_type"] = int(random_participant_type_for_unit_test)
|
||||
with pytest.raises(ValidationError, match="At least one of the required fields is missing. Missing:"):
|
||||
loadedModel[key] = content[key] = None
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# arrival + terminal
|
||||
for key in ["operations_start"]+InputValidationTimes.get_post_data_type_independent_fields():
|
||||
# pass: all required fields exist for the current shipcall type (arrival/incoming)
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
loadedModel["participant_type"] = ParticipantType.TERMINAL
|
||||
content["participant_type"] = int(ParticipantType.TERMINAL)
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# fails: iteratively creates stubs, where one of the required keys is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
loadedModel["participant_type"] = ParticipantType.TERMINAL
|
||||
content["participant_type"] = int(ParticipantType.TERMINAL)
|
||||
|
||||
with pytest.raises(ValidationError, match="At least one of the required fields is missing. Missing:"):
|
||||
loadedModel[key] = content[key] = None
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_missing_required_fields_departure():
|
||||
"""
|
||||
evaluates every individual combination of departing shipcalls, where one of the required values is arbitrarily missing
|
||||
randomly selects one of the non-terminal ParticipantTypes, which are reasonable (not .BSMD), and validates. This makes sure,
|
||||
that over time, every possible combination has been tested.
|
||||
"""
|
||||
# departure + not-terminal
|
||||
non_terminal_list = [ParticipantType.AGENCY, ParticipantType.MOORING, ParticipantType.PILOT, ParticipantType.PORT_ADMINISTRATION, ParticipantType.TUG]
|
||||
|
||||
for key in ["etd_berth"]+InputValidationTimes.get_post_data_type_independent_fields():
|
||||
# select a *random* particiipant type, which is reasonable and *not* TERMINAL, and validate the function.
|
||||
random_participant_type_for_unit_test = random.sample(non_terminal_list,k=1)[0]
|
||||
|
||||
# pass
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 241
|
||||
loadedModel["participant_type"] = random_participant_type_for_unit_test
|
||||
content["participant_type"] = int(random_participant_type_for_unit_test)
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# fails: iteratively creates stubs, where one of the required keys is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 241
|
||||
loadedModel["participant_type"] = random_participant_type_for_unit_test
|
||||
content["participant_type"] = int(random_participant_type_for_unit_test)
|
||||
with pytest.raises(ValidationError, match="At least one of the required fields is missing. Missing:"):
|
||||
loadedModel[key] = content[key] = None
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# departure + terminal
|
||||
for key in ["operations_end"]+InputValidationTimes.get_post_data_type_independent_fields():
|
||||
# pass
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 241
|
||||
loadedModel["participant_type"] = ParticipantType.TERMINAL
|
||||
content["participant_type"] = int(ParticipantType.TERMINAL)
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# fails: iteratively creates stubs, where one of the required keys is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 241
|
||||
loadedModel["participant_type"] = ParticipantType.TERMINAL
|
||||
content["participant_type"] = int(ParticipantType.TERMINAL)
|
||||
with pytest.raises(ValidationError, match="At least one of the required fields is missing. Missing:"):
|
||||
loadedModel[key] = content[key] = None
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
return
|
||||
|
||||
def test_input_validation_times_fails_when_missing_required_fields_shifting():
|
||||
"""
|
||||
evaluates every individual combination of shifting shipcalls, where one of the required values is arbitrarily missing
|
||||
randomly selects one of the non-terminal ParticipantTypes, which are reasonable (not .BSMD), and validates. This makes sure,
|
||||
that over time, every possible combination has been tested.
|
||||
"""
|
||||
# shifting + not-terminal
|
||||
non_terminal_list = [ParticipantType.AGENCY, ParticipantType.MOORING, ParticipantType.PILOT, ParticipantType.PORT_ADMINISTRATION, ParticipantType.TUG]
|
||||
for key in ["eta_berth", "etd_berth"]+InputValidationTimes.get_post_data_type_independent_fields():
|
||||
# select a *random* particiipant type, which is reasonable and *not* TERMINAL, and validate the function.
|
||||
random_participant_type_for_unit_test = random.sample(non_terminal_list,k=1)[0]
|
||||
# pass: all required fields exist for the current shipcall type (arrival/incoming)
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 189
|
||||
loadedModel["participant_type"] =random_participant_type_for_unit_test
|
||||
content["participant_type"] = int(random_participant_type_for_unit_test)
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# fails: iteratively creates stubs, where one of the required keys is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 189
|
||||
loadedModel["participant_type"] = random_participant_type_for_unit_test
|
||||
content["participant_type"] = int(random_participant_type_for_unit_test)
|
||||
with pytest.raises(ValidationError, match="At least one of the required fields is missing. Missing:"):
|
||||
loadedModel[key] = content[key] = None
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# shifting + terminal
|
||||
for key in ["operations_start", "operations_end"]+InputValidationTimes.get_post_data_type_independent_fields():
|
||||
# pass: all required fields exist for the current shipcall type (arrival/incoming)
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 189
|
||||
loadedModel["participant_type"] = ParticipantType.TERMINAL
|
||||
content["participant_type"] = int(ParticipantType.TERMINAL)
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
|
||||
# fails: iteratively creates stubs, where one of the required keys is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 189
|
||||
loadedModel["participant_type"] = ParticipantType.TERMINAL
|
||||
content["participant_type"] = int(ParticipantType.TERMINAL)
|
||||
with pytest.raises(ValidationError, match="At least one of the required fields is missing. Missing:"):
|
||||
loadedModel[key] = content[key] = None
|
||||
InputValidationTimes.check_times_required_fields_post_data(loadedModel, content)
|
||||
return
|
||||
|
||||
|
||||
|
||||
def test_input_validation_times_fails_when_participant_type_is_not_assigned__or__user_does_not_belong_to_the_same_participant_id():
|
||||
"""
|
||||
There are two failure cases in InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines
|
||||
1.) when the participant type is simply not assigned
|
||||
2.) when the participant type matches to the user, but the participant_id is not assigned
|
||||
|
||||
Test case:
|
||||
shipcall_id 222 is assigned to the participants {"participant_id": 136, "type":2} and {"participant_id": 136, "type":8}
|
||||
|
||||
Case 1:
|
||||
When user_id 3 should be set as participant_type 4, the call fails, because type 4 is not assigned
|
||||
|
||||
Case 2:
|
||||
When user_id 2 (participant_id 2) should be set as participant_type 2, the call fails even though type 2 exists,
|
||||
because participant_id 136 is assigned
|
||||
|
||||
Case 3:
|
||||
When user_id 28 (participant_id 136) is set as participant_type 2, the call passes.
|
||||
"""
|
||||
# fails: participant type 4 does not exist
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
participant_type = 4
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
loadedModel["participant_id"] = content["participant_id"] = 2
|
||||
loadedModel["participant_type"] = content["participant_type"] = participant_type
|
||||
|
||||
with pytest.raises(ValidationError, match=f"Could not find a matching time dataset for the provided participant_type: {participant_type}. Found Time Datasets:"):
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=loadedModel, times_id=None)
|
||||
|
||||
# fails: participant type 2 exists, but user_id 2 is part of the wrong participant_id group (user_id 28 or 29 would be)
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=2)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
participant_type = 2
|
||||
loadedModel["participant_type"] = content["participant_type"] = participant_type
|
||||
with pytest.raises(ValidationError, match="The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id:"):
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=loadedModel, times_id=None)
|
||||
|
||||
# pass: participant type 2 exists & user_id is part of participant_id group 136, which is correct
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=28)
|
||||
loadedModel["shipcall_id"] = content["shipcall_id"] = 222
|
||||
participant_type = 2
|
||||
loadedModel["participant_type"] = content["participant_type"] = participant_type
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=loadedModel, times_id=None)
|
||||
return
|
||||
|
||||
|
||||
def test_input_validation_times_put_request_fails_when_id_field_is_missing():
|
||||
"""used within PUT-requests. When 'id' is missing, a ValidationError is issued"""
|
||||
# passes: as an 'id' is provided
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
content["id"] = 379
|
||||
InputValidationTimes.check_times_required_fields_put_data(content)
|
||||
|
||||
# fails: 'id' field is missing
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
content.pop("id",None)
|
||||
with pytest.raises(ValidationError, match="A PUT-request requires an 'id' reference, which was not found."):
|
||||
InputValidationTimes.check_times_required_fields_put_data(content)
|
||||
return
|
||||
|
||||
def test_input_validation_times_delete_request_fails_when_times_id_is_deleted_already():
|
||||
# passes: id exists
|
||||
times_id = 379
|
||||
InputValidationTimes.check_if_entry_is_already_deleted(times_id)
|
||||
|
||||
# passes: id exists
|
||||
times_id = 391
|
||||
InputValidationTimes.check_if_entry_is_already_deleted(times_id)
|
||||
|
||||
# fails
|
||||
times_id = 11
|
||||
with pytest.raises(ValidationError, match=f"The selected time entry is already deleted. ID: {times_id}"):
|
||||
InputValidationTimes.check_if_entry_is_already_deleted(times_id)
|
||||
|
||||
# fails
|
||||
times_id = 4
|
||||
with pytest.raises(ValidationError, match=f"The selected time entry is already deleted. ID: {times_id}"):
|
||||
InputValidationTimes.check_if_entry_is_already_deleted(times_id)
|
||||
return
|
||||
|
||||
def test_input_validation_times_delete_request_fails_when_times_id_does_not_exist_():
|
||||
# passes: times_id exists
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=28)
|
||||
times_id = 392
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=None, times_id=times_id)
|
||||
|
||||
# fails: times_id does not exist
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=28)
|
||||
times_id = 4
|
||||
with pytest.raises(ValidationError, match=f"Unknown times_id. Could not find a matching entry for ID: {times_id}"):
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=None, times_id=times_id)
|
||||
return
|
||||
|
||||
def test_input_validation_times_delete_request_fails_when_user_belongs_to_wrong_participant_id():
|
||||
# fails: participant_id should be 136, but user_id=3 belongs to participant_id=2
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=3)
|
||||
times_id = 392
|
||||
|
||||
with pytest.raises(ValidationError, match=f"The dataset may only be changed by a user belonging to the same participant group as the times dataset is referring to. User participant_id:"):
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=None, times_id=times_id)
|
||||
|
||||
# passes: participant_id should be 136, and user_id=28 belongs to participant_id=2
|
||||
user_data, loadedModel, content = get_valid_stub_for_pytests(user_id=28)
|
||||
times_id = 392
|
||||
InputValidationTimes.check_user_belongs_to_same_group_as_dataset_determines(user_data, loadedModel=None, times_id=times_id)
|
||||
return
|
||||
|
||||
|
||||
Reference in New Issue
Block a user