added a reference check to PUT-requests (shipcall), so each provided ID must exist. Corrected some input validation functions to solve known bugs

This commit is contained in:
Max Metz 2024-08-12 19:43:19 +02:00
parent 7e6252880d
commit e526337c6a
8 changed files with 111 additions and 41 deletions

View File

@ -88,6 +88,9 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N
if schemas is sentinel: if schemas is sentinel:
raise Exception("no such record") raise Exception("no such record")
elif command_type=="execute_scalar":
schemas = commands.execute_scalar(query)
else: else:
raise ValueError(command_type) raise ValueError(command_type)

View File

@ -110,8 +110,8 @@ class History:
"shipcall_id": self.shipcall_id, "shipcall_id": self.shipcall_id,
"timestamp": self.timestamp.isoformat() if self.timestamp else "", "timestamp": self.timestamp.isoformat() if self.timestamp else "",
"eta": self.eta.isoformat() if self.eta else "", "eta": self.eta.isoformat() if self.eta else "",
"type": self.type.name, "type": self.type.name if isinstance(self.type, IntEnum) else ObjectType(self.type).name,
"operation": self.operation.name "operation": self.operation.name if isinstance(self.operation, IntEnum) else OperationType(self.operation).name
} }
@classmethod @classmethod
@ -146,7 +146,7 @@ class Notification:
"id": self.id, "id": self.id,
"shipcall_id": self.shipcall_id, "shipcall_id": self.shipcall_id,
"level": self.level, "level": self.level,
"type": self.type.name, "type": self.type.name if isinstance(self.type, IntEnum) else NotificationType(self.type).name,
"message": self.message, "message": self.message,
"created": self.created.isoformat() if self.created else "", "created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "" "modified": self.modified.isoformat() if self.modified else ""
@ -309,7 +309,7 @@ class Shipcall:
return { return {
"id": self.id, "id": self.id,
"ship_id": self.ship_id, "ship_id": self.ship_id,
"type": self.type.name, "type": self.type.name if isinstance(self.type, IntEnum) else ShipcallType(self.type).name,
"eta": self.eta.isoformat() if self.eta else "", "eta": self.eta.isoformat() if self.eta else "",
"voyage": self.voyage, "voyage": self.voyage,
"etd": self.etd.isoformat() if self.etd else "", "etd": self.etd.isoformat() if self.etd else "",
@ -330,7 +330,7 @@ class Shipcall:
"anchored": self.anchored, "anchored": self.anchored,
"moored_lock": self.moored_lock, "moored_lock": self.moored_lock,
"canceled": self.canceled, "canceled": self.canceled,
"evaluation": self.evaluation.name, "evaluation": self.evaluation.name if isinstance(self.evaluation, IntEnum) else EvaluationType(self.evaluation).name,
"evaluation_message": self.evaluation_message, "evaluation_message": self.evaluation_message,
"evaluation_time": self.evaluation_time.isoformat() if self.evaluation_time else "", "evaluation_time": self.evaluation_time.isoformat() if self.evaluation_time else "",
"evaluation_notifications_sent": self.evaluation_notifications_sent, "evaluation_notifications_sent": self.evaluation_notifications_sent,
@ -549,7 +549,9 @@ class ShipSchema(Schema):
@validates("name") @validates("name")
def validate_name(self, value): def validate_name(self, value):
character_length = len(str(value)) character_length = len(str(value))
if character_length>=64: if character_length<1:
raise ValidationError(f"'name' argument should have at least one character")
elif character_length>=64:
raise ValidationError(f"'name' argument should have at max. 63 characters") raise ValidationError(f"'name' argument should have at max. 63 characters")
if check_if_string_has_special_characters(value): if check_if_string_has_special_characters(value):
@ -609,7 +611,7 @@ class ShipcallParticipantMap:
"id": self.id, "id": self.id,
"shipcall_id": self.shipcall_id, "shipcall_id": self.shipcall_id,
"participant_id": self.participant_id, "participant_id": self.participant_id,
"type": self.type.name, "type": self.type.name if isinstance(self.type, IntEnum) else ShipcallType(self.type).name,
"created": self.created.isoformat() if self.created else "", "created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "", "modified": self.modified.isoformat() if self.modified else "",
} }

View File

@ -162,8 +162,8 @@ def get_stub_valid_shipcall_departure():
return post_data return post_data
def get_stub_valid_shipcall_shifting(): def get_stub_valid_shipcall_shifting():
eta = (datetime.datetime.now()+datetime.timedelta(minutes=45)).isoformat() etd = (datetime.datetime.now()+datetime.timedelta(minutes=45)).isoformat()
etd = (datetime.datetime.now()+datetime.timedelta(minutes=60)).isoformat() eta = (datetime.datetime.now()+datetime.timedelta(minutes=60)).isoformat()
post_data = { post_data = {
**get_stub_valid_shipcall_base(), **get_stub_valid_shipcall_base(),

View File

@ -86,6 +86,9 @@ class InputValidationShipcall():
# the ID field is required, all missing fields will be ignored in the update # the ID field is required, all missing fields will be ignored in the update
InputValidationShipcall.check_required_fields_of_put_request(content) InputValidationShipcall.check_required_fields_of_put_request(content)
# check the referenced IDs
InputValidationShipcall.check_referenced_ids(loadedModel)
# check for reasonable values in the shipcall fields and checks for forbidden keys. # check for reasonable values in the shipcall fields and checks for forbidden keys.
InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["evaluation", "evaluation_message"]) InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["evaluation", "evaluation_message"])
@ -385,8 +388,8 @@ class InputValidationShipcall():
if (not eta > time_now) or (not etd > time_now): if (not eta > time_now) or (not etd > time_now):
raise ValidationError(f"'eta' and 'etd' must be in the future. Incorrect datetime provided. Current Time: {time_now}. ETA: {eta}. ETD: {etd}") raise ValidationError(f"'eta' and 'etd' must be in the future. Incorrect datetime provided. Current Time: {time_now}. ETA: {eta}. ETD: {etd}")
if (not etd > eta): if (not etd < eta):
raise ValidationError(f"'etd' must be larger than 'eta'. The ship cannot depart, before it has arrived. Found: ETA {eta}, ETD: {etd}") raise ValidationError(f"The estimated time of departure ('etd') must take place *before the estimated time of arrival ('eta'). The ship cannot arrive, before it has departed. Found: ETD: {etd}, ETA: {eta}")
if (eta is not None and etd is None) or (eta is None and etd is not None): if (eta is not None and etd is None) or (eta is None and etd is not None):
raise ValidationError(f"'eta' and 'etd' must both be provided when the shipcall type is 'shifting'.") raise ValidationError(f"'eta' and 'etd' must both be provided when the shipcall type is 'shifting'.")
@ -466,6 +469,7 @@ class InputValidationShipcall():
# use the decoded JWT token and extract the participant type & participant id # use the decoded JWT token and extract the participant type & participant id
participant_id = user_data.get("participant_id") participant_id = user_data.get("participant_id")
participant_type = get_participant_type_from_user_data(user_data) participant_type = get_participant_type_from_user_data(user_data)
user_is_bsmd = (ParticipantType.BSMD in participant_type)
# get the shipcall id # get the shipcall id
shipcall_id = loadedModel.get("id") shipcall_id = loadedModel.get("id")
@ -478,13 +482,12 @@ class InputValidationShipcall():
else: else:
assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)] assigned_agency = [spm for spm in shipcall_participant_map if int(spm.type) == int(ParticipantType.AGENCY)]
if len(assigned_agency)==0: an_agency_is_assigned = len(assigned_agency)==1
raise ValidationError(f"There is no assigned agency for the shipcall with ID {shipcall_id}.") if len(assigned_agency)>1:
elif len(assigned_agency)>1:
raise ValidationError(f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}") raise ValidationError(f"Internal error? Found more than one assigned agency for the shipcall with ID {shipcall_id}. Found: {assigned_agency}")
else: if an_agency_is_assigned:
# Agency assigned? User must belong to the assigned agency or be a BSMD user, in case the flag is set
assigned_agency = assigned_agency[0] assigned_agency = assigned_agency[0]
# determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls # determine, whether the assigned agency has set the BSMD-flag to allow BSMD users to edit their assigned shipcalls
@ -497,7 +500,6 @@ class InputValidationShipcall():
### USER authority ### ### USER authority ###
# determine, whether the user is a) the assigned agency or b) a BSMD participant # determine, whether the user is a) the assigned agency or b) a BSMD participant
user_is_assigned_agency = (participant_id == assigned_agency.participant_id) user_is_assigned_agency = (participant_id == assigned_agency.participant_id)
user_is_bsmd = (ParticipantType.BSMD in participant_type)
# when the BSMD flag is set: the user must be either BSMD or the assigned agency # when the BSMD flag is set: the user must be either BSMD or the assigned agency
# when the BSMD flag is not set: the user must be the assigned agency # when the BSMD flag is not set: the user must be the assigned agency
@ -505,6 +507,14 @@ class InputValidationShipcall():
if not user_is_authorized: if not user_is_authorized:
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {agency_participant.flags}") # Forbidden: 403 raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). Assigned Agency: {assigned_agency} with Flags: {agency_participant.flags}") # Forbidden: 403
else:
# when there is no assigned agency, only BSMD users can update the shipcall
user_is_authorized = user_is_bsmd
if not user_is_authorized:
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.") # part of a pytest.raises. Forbidden: 403
return return

View File

@ -383,7 +383,6 @@ class InputValidationTimes():
# perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection. # perform an SQL query. Creates a pooled connection internally, queries the database, then closes the connection.
query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?" query = "SELECT participant_id, participant_type, shipcall_id FROM times WHERE id = ?id?"
pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None) pdata = execute_sql_query_standalone(query=query, param={"id":times_id}, pooledConnection=None)
print(pdata)
# extracts the participant_id from the first matching entry, if applicable # extracts the participant_id from the first matching entry, if applicable
if not len(pdata)>0: if not len(pdata)>0:

View File

@ -265,3 +265,11 @@ def test_input_validation_ship_post_failure_case_20240802():
post_data["bollard_pull"] = None post_data["bollard_pull"] = None
InputValidationShip.evaluate_post_data(user_data, loadedModel, content) InputValidationShip.evaluate_post_data(user_data, loadedModel, content)
return return
def test_input_validation_ship_post_failure_when_name_empty():
# success
post_data = get_stub_valid_ship()
post_data["name"] = "" # empty name
with pytest.raises(ValidationError, match="'name' argument should have at least one character"):
loadedModel = model.ShipSchema().load(data=post_data, many=False, partial=True)
return

View File

@ -1,6 +1,7 @@
import pytest import pytest
import os import os
import re
import jwt import jwt
import json import json
import requests import requests
@ -271,16 +272,16 @@ def test_shipcall_post_request_fails_when_type_shifting_and_not_in_future(get_st
# accept # accept
post_data = original_post_data.copy() post_data = original_post_data.copy()
post_data["type"] = ShipcallType.shifting.name post_data["type"] = ShipcallType.shifting.name
post_data["eta"] = (datetime.datetime.now() + datetime.timedelta(hours=3)).isoformat() post_data["etd"] = (datetime.datetime.now() + datetime.timedelta(hours=3)).isoformat()
post_data["etd"] = (datetime.datetime.now() + datetime.timedelta(hours=3,minutes=1)).isoformat() post_data["eta"] = (datetime.datetime.now() + datetime.timedelta(hours=3,minutes=1)).isoformat()
response = requests.post(f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data) response = requests.post(f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data)
assert response.status_code == 201 assert response.status_code == 201
# error # error
post_data = original_post_data.copy() post_data = original_post_data.copy()
post_data["type"] = ShipcallType.shifting.name post_data["type"] = ShipcallType.shifting.name
post_data["eta"] = (datetime.datetime.now() + datetime.timedelta(hours=3)).isoformat() post_data["etd"] = (datetime.datetime.now() - datetime.timedelta(hours=3)).isoformat()
post_data["etd"] = (datetime.datetime.now() - datetime.timedelta(hours=3,minutes=1)).isoformat() post_data["eta"] = (datetime.datetime.now() + datetime.timedelta(hours=3,minutes=1)).isoformat()
response = requests.post(f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data) response = requests.post(f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data)
with pytest.raises(ValidationError, match="must be in the future. Incorrect datetime provided"): with pytest.raises(ValidationError, match="must be in the future. Incorrect datetime provided"):
@ -446,13 +447,14 @@ def test_shipcall_post_invalid_etd_smaller_than_eta(get_stub_token):
url, token = get_stub_token["url"], get_stub_token["token"] url, token = get_stub_token["url"], get_stub_token["token"]
post_data = get_stub_valid_shipcall_shifting() post_data = get_stub_valid_shipcall_shifting()
post_data["etd"] = (datetime.datetime.fromisoformat(post_data["eta"])-datetime.timedelta(minutes=1)).isoformat() post_data["eta"] = (datetime.datetime.fromisoformat(post_data["etd"])-datetime.timedelta(minutes=1)).isoformat()
response = requests.post( response = requests.post(
f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data
) )
validation_error_default_asserts(response) validation_error_default_asserts(response)
assert "\'etd\' must be larger than \'eta\'. " in response.json().get("message","") assert "The estimated time of departure ('etd') must take place *before the estimated time of arrival ('eta')" in response.json().get("message","")
#assert "\'etd\' must be larger than \'eta\'. " in response.json().get("message","")
return return
def test_shipcall_post_invalid_eta_and_etd_must_be_in_future(get_stub_token): def test_shipcall_post_invalid_eta_and_etd_must_be_in_future(get_stub_token):
@ -682,7 +684,7 @@ def test_shipcall_put_request_fails_when_no_agency_is_assigned(get_shipcall_id_a
post_data = get_stub_valid_shipcall_arrival() post_data = get_stub_valid_shipcall_arrival()
post_data["id"] = shipcall_id post_data["id"] = shipcall_id
user_data = {'id':6, 'participant_id':1} user_data = {'id':6, 'participant_id':2} # participant_id 2 is not BSMD and is not authorized.
loadedModel = post_data loadedModel = post_data
content = post_data content = post_data
@ -703,7 +705,8 @@ def test_shipcall_put_request_fails_when_no_agency_is_assigned(get_shipcall_id_a
# no agency assigned # no agency assigned
ivs = InputValidationShipcall() ivs = InputValidationShipcall()
with pytest.raises(ValidationError, match=f"There is no assigned agency for the shipcall with ID"):
with pytest.raises(werkzeug.exceptions.Forbidden, match=re.escape(f"PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.")):
ivs.check_user_is_authorized_for_put_request(user_data, loadedModel, content, spm_shipcall_data) ivs.check_user_is_authorized_for_put_request(user_data, loadedModel, content, spm_shipcall_data)
return return
@ -766,7 +769,7 @@ def test_shipcall_put_request_fails_when_user_tries_self_assignment(get_shipcall
# self-assignment. User is participant 6, and wants to assign participant 6. # self-assignment. User is participant 6, and wants to assign participant 6.
ivs = InputValidationShipcall() ivs = InputValidationShipcall()
with pytest.raises(ValidationError, match=f"There is no assigned agency for the shipcall with ID"): with pytest.raises(werkzeug.exceptions.Forbidden, match=re.escape("PUT Requests for shipcalls can only be issued by an assigned AGENCY or BSMD users (if the special-flag is enabled). There is no assigned agency yet, so only BSMD users can change datasets.")):
# previous error message: An agency cannot self-register for a shipcall. The request is issued by an agency-user and tries to assign an AGENCY as the participant of the shipcall."" # previous error message: An agency cannot self-register for a shipcall. The request is issued by an agency-user and tries to assign an AGENCY as the participant of the shipcall.""
# however, self-assignment is no longer possible, because the SPM is verified beforehand. # however, self-assignment is no longer possible, because the SPM is verified beforehand.
ivs.check_user_is_authorized_for_put_request(user_data, loadedModel, content, spm_shipcall_data) ivs.check_user_is_authorized_for_put_request(user_data, loadedModel, content, spm_shipcall_data)
@ -833,8 +836,8 @@ def test_shipcall_put_request_fails_input_validation_shipcall_when_shipcall_is_c
content = put_data content = put_data
# eta & etd must be in the future, as the request fails otherwise. The shipcall is 'shifting', so both must be provided. # eta & etd must be in the future, as the request fails otherwise. The shipcall is 'shifting', so both must be provided.
loadedModel["eta"] = datetime.datetime.now()+datetime.timedelta(minutes=1) loadedModel["etd"] = datetime.datetime.now()+datetime.timedelta(minutes=1)
loadedModel["etd"] = datetime.datetime.now()+datetime.timedelta(minutes=2) loadedModel["eta"] = datetime.datetime.now()+datetime.timedelta(minutes=2)
### FAILS: ### FAILS:
# user 9 (participant id 4) is *not* assigned to the shipcall # user 9 (participant id 4) is *not* assigned to the shipcall
@ -855,3 +858,46 @@ def test_shipcall_put_request_fails_input_validation_shipcall_when_shipcall_is_c
### verification should pass ### verification should pass
InputValidationShipcall.evaluate_put_data(user_data, loadedModel, content) InputValidationShipcall.evaluate_put_data(user_data, loadedModel, content)
return return
def test_post_data_with_valid_data(get_stub_token):
"""This unit test uses the input data from
# https://trello.com/c/VXVSLTF4/267-shipcall-anlegen-shifting-erh%C3%A4lt-fehler-aufgrund-fr%C3%BCherem-etd-als-eta
to make sure, the failure case no longer appears.
"""
url, token = get_stub_token["url"], get_stub_token["token"]
post_data = {
"arrival_berth_id": 167,
"created": "2024-08-08T17:20:00",
"departure_berth_id": 167,
"eta": "2024-08-18T18:18:09.174",
"etd": "2024-08-16T18:18:09.174",
"participants": [
{
"participant_id": 136,
"type": 8
},
{
"participant_id": 11,
"type": 32
},
{
"participant_id": 1,
"type": 1
}
],
"ship_id": 12, # originally used ship_id 18. Had to change to a different ship id, so the ship is known.
"type": "shifting"
}
# test 1: shipcall schema
model.ShipcallSchema().load(data=post_data, many=False, partial=True)
# test 2: post request with OKAY status_code
response = requests.post(
f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data
)
assert response.status_code==201
return

View File

@ -245,6 +245,8 @@ def test_input_validation_times_fails_when_missing_required_fields_shifting():
evaluates every individual combination of shifting shipcalls, where one of the required values is arbitrarily missing evaluates every individual combination of shifting shipcalls, where one of the required values is arbitrarily missing
randomly selects one of the non-terminal ParticipantTypes, which are reasonable (not .BSMD), and validates. This makes sure, randomly selects one of the non-terminal ParticipantTypes, which are reasonable (not .BSMD), and validates. This makes sure,
that over time, every possible combination has been tested. that over time, every possible combination has been tested.
Due to the amount of combinations, this test is rather slow. As off 12.08.2024, the test took 10.585362434387207 [s]
""" """
# shifting + not-terminal # shifting + not-terminal
non_terminal_list = [ParticipantType.AGENCY, ParticipantType.MOORING, ParticipantType.PILOT, ParticipantType.PORT_ADMINISTRATION, ParticipantType.TUG] non_terminal_list = [ParticipantType.AGENCY, ParticipantType.MOORING, ParticipantType.PILOT, ParticipantType.PORT_ADMINISTRATION, ParticipantType.TUG]