implemented PUT-request validation. It validates the user's authority, as well as the respective PUT data. Both, POST-request and PUT-request, of shipcalls are now input-validated.

This commit is contained in:
Max Metz 2024-05-27 15:00:00 +02:00
parent 7c8cd3763a
commit 95c771b7d1
6 changed files with 272 additions and 17 deletions

View File

@ -6,10 +6,12 @@ from .. import impl
from ..services.auth_guard import auth_guard, check_jwt
from BreCal.validators.input_validation import validate_posted_shipcall_data, check_if_user_is_bsmd_type
from BreCal.validators.input_validation_shipcall import InputValidationShipcall
from BreCal.database.sql_handler import execute_sql_query_standalone
import logging
import json
import traceback
import werkzeug
bp = Blueprint('shipcalls', __name__)
@ -47,8 +49,7 @@ def PostShipcalls():
# read the user data from the JWT token (set when login is performed)
user_data = check_jwt()
# validate the posted shipcall data
# validate_posted_shipcall_data(user_data, loadedModel, content)
# validate the posted shipcall data & the user's authority
InputValidationShipcall.evaluate_post_data(user_data, loadedModel, content)
except ValidationError as ex:
@ -76,17 +77,18 @@ def PutShipcalls():
# read the user data from the JWT token (set when login is performed)
user_data = check_jwt()
# check, whether the user belongs to a participant, which is of type ParticipantType.BSMD
# as ParticipantType is an IntFlag, a user belonging to multiple groups is properly evaluated.
is_bsmd = check_if_user_is_bsmd_type(user_data)
if not is_bsmd:
raise ValidationError(f"current user does not belong to BSMD. Cannot post shipcalls. Found user data: {user_data}")
# InputValidationShipcall.evaluate_put_data(user_data, loadedModel, content)
# validate the PUT shipcall data and the user's authority
InputValidationShipcall.evaluate_put_data(user_data, loadedModel, content)
except ValidationError as ex:
logging.error(ex)
print(ex)
return json.dumps({"message":f"bad format. \nError Messages: {ex.messages}. \nValid Data: {ex.valid_data}"}), 400
except werkzeug.exceptions.Forbidden as ex:
logging.error(ex)
print(ex)
return json.dumps({"message":ex.description}), 403
except Exception as ex:
logging.error(ex)

View File

@ -1,9 +1,11 @@
import numpy as np
import pandas as pd
import pydapper
import datetime
import typing
from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times, ShipcallParticipantMap
from BreCal.database.enums import ParticipantType
from BreCal.local_db import getPoolConnection
def pandas_series_to_data_model():
return
@ -51,6 +53,29 @@ def get_synchronous_shipcall_times_standalone(query_time:pd.Timestamp, all_df_ti
counts = sum(time_deltas_filtered) # int
return counts
def execute_sql_query_standalone(query, param, pooledConnection=None):
"""
execute an arbitrary query with a set of parameters, return the output and convert it to a list.
when the pooled connection is rebuilt, it will be closed at the end of the function.
"""
rebuild_pooled_connection = pooledConnection is None
if rebuild_pooled_connection:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
# participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?";
# creates a generator
schemas = commands.query(query, model=dict, param=param, buffered=False)
# creates a list of results from the generator
schemas = [schema for schema in schemas]
if rebuild_pooled_connection:
pooledConnection.close()
return schemas
class SQLHandler():
"""
An object that reads SQL queries from the sql_connection and stores it in pandas DataFrames. The object can read all available tables
@ -66,6 +91,15 @@ class SQLHandler():
if read_all:
self.read_all(self.all_schemas)
def execute_sql_query(self, sql_connection, query, param):
"""
this method is best used in combination with a python context-manager, such as:
with mysql.connector.connect(**mysql_connection_data) as sql_connection:
schema = sql_handler.execute_sql_query(sql_connection, query)
"""
schemas = execute_sql_query_standalone(query, param, pooledConnection=sql_connection)
return schemas
def get_all_schemas_from_mysql(self):
with self.sql_connection.cursor(buffered=True) as cursor:
cursor.execute("SHOW TABLES")

View File

@ -48,6 +48,12 @@ def GetShipcalls(options):
def PostShipcalls(schemaModel):
"""
This function *executes* a post-request for shipcalls. The function is accessible as part of an API route.
The common sequence is:
a) issue a request to the Flask API
b) BreCal.api.shipcalls.PostShipcalls, to verify the incoming request (which includes an authentification guard)
c) BreCal.impl.shipcalls.PostShipcalls, to execute the incoming request
:param schemaModel: The deserialized dict of the request
e.g.,
@ -57,7 +63,6 @@ def PostShipcalls(schemaModel):
'pier_side': False, 'bunkering': True, 'recommended_tugs': 2, 'type_value': 1, 'evaluation_value': 0}
}
"""
# TODO: Validate the upload data
# This creates a *new* entry
try:
@ -180,6 +185,8 @@ def PutShipcalls(schemaModel):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
user_data = check_jwt()
# test if object to update is found
sentinel = object()
@ -263,7 +270,6 @@ def PutShipcalls(schemaModel):
# save history data
# TODO: set ETA properly
user_data = check_jwt()
# query = create_sql_query_history_put()
query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)"
commands.execute(query, {"scid" : schemaModel["id"], "pid" : user_data["participant_id"], "uid" : user_data["id"]})

View File

@ -1,3 +1,4 @@
import typing
import json
import datetime
from abc import ABC, abstractmethod
@ -11,7 +12,8 @@ from BreCal.impl.berths import GetBerths
from BreCal.database.enums import ParticipantType, ParticipantFlag
from BreCal.validators.input_validation_utils import check_if_user_is_bsmd_type, check_if_ship_id_is_valid, check_if_berth_id_is_valid, check_if_participant_ids_are_valid, check_if_participant_ids_and_types_are_valid, check_if_string_has_special_characters, get_shipcall_id_dictionary, get_participant_type_from_user_data, check_if_int_is_valid_flag
from BreCal.database.sql_handler import execute_sql_query_standalone
import werkzeug
class InputValidationShipcall():
"""
@ -67,18 +69,20 @@ class InputValidationShipcall():
5. a canceled shipcall may not be changed
"""
# check for permission (only BSMD-type participants)
# #TODO: are both, bsmd and agency, user types accepted?
InputValidationShipcall.check_user_is_bsmd_type(user_data)
# check, whether an agency is listed in the shipcall-participant-map
# InputValidationShipcall.check_agency_in_shipcall_participant_map() # args?
InputValidationShipcall.check_agency_in_shipcall_participant_map(user_data, loadedModel, content)
# the ID field is required, all missing fields will be ignored in the update
InputValidationShipcall.check_required_fields_of_put_request(content)
# check for reasonable values in the shipcall fields and checks for forbidden keys. Note: 'canceled' is allowed in PUT-requests.
# check for reasonable values in the shipcall fields and checks for forbidden keys.
InputValidationShipcall.check_shipcall_values(loadedModel, content, forbidden_keys=["evaluation", "evaluation_message"])
# a canceled shipcall cannot be selected
# Note: 'canceled' is allowed in PUT-requests, if it is not already set (which is checked by InputValidationShipcall.check_shipcall_is_cancel)
InputValidationShipcall.check_shipcall_is_canceled(loadedModel, content)
return
@ -113,7 +117,65 @@ class InputValidationShipcall():
return
@staticmethod
def check_agency_in_shipcall_participant_map(): # args?
def check_agency_in_shipcall_participant_map(user_data:dict, loadedModel:dict, content:dict, spm_shipcall_data:typing.Optional[list]=None):
"""
When the request is issued by a user of type 'AGENCY', there must be special caution. Agency users cannot self-assign as participants
of a shipcall. Further, when no AGENCY is assigned to the shipcall, a PUT-request is not feasible. In those cases, the
BSMD must first assign an agency, before a PUT-request can assign further participants.
Upon violation, this method issues 'Forbidden'-Exceptions with HTTP status code 403. There are four reasons for violations:
a) an agency tries to self-assign for a shipcall
b) there is no assigned agency for the current shipcall
c) an agency is assigned, but the current agency-user belongs to a different participant_id
d) the user must be of ParticipantType BSMD or AGENCY
args:
spm_shipcall_data:
a list of entries obtained from the ShipcallParticipantMap. These are deserialized dictionaries.
e.g., [{'participant_id': 136, 'type': 8}, ]
"""
if spm_shipcall_data is None:
# read the ShipcallParticipantMap entry of the current shipcall_id. This is used within the input validation of a PUT request
spm_shipcall_data = execute_sql_query_standalone(
query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?",
param={"shipcall_id":loadedModel["id"]},
pooledConnection=None
)
# which role should be set by the PUT request? If the agency is about to be set, an error will be created
# read the user data from the JWT token (set when login is performed)
user_type = get_participant_type_from_user_data(user_data) # decode JWT -> get 'type' value
# select the matching entries from the ShipcallParticipantMap
agency_entries = [spm_entry for spm_entry in spm_shipcall_data if int(spm_entry.get("type"))==int(ParticipantType.AGENCY)] # find all entries of type AGENCY (there should be at max. 1)
# when the request stems from an AGENCY user, and the user wants to PUT an AGENCY role, the request should fail
# boolean: check, whether any of the assigned participants is of type AGENCY
types = [participant.get("type") for participant in loadedModel["participants"]] # readout the participants from the loadedModel, which shall be assigned by the PUT request
any_type_is_agency = any([int(type_) == int(ParticipantType.AGENCY) for type_ in types]) # check, whether *any* of the participants is an agency
if not (int(user_type) in [int(ParticipantType.AGENCY), int(ParticipantType.BSMD)]):
# user not AGENCY or BSMD
raise werkzeug.exceptions.Forbidden(f"PUT Requests for shipcalls can only be issued by AGENCY or BSMD users.") # Forbidden: 403
if (int(user_type) == int(ParticipantType.AGENCY)) & (any_type_is_agency):
# self-assignment: agency sets agency participant
raise werkzeug.exceptions.Forbidden(f"An agency cannot self-register for a shipcall. The request is issued by an agency-user and tries to assign an AGENCY as the participant of the shipcall.") # Forbidden: 403
if len(agency_entries)>0:
# agency participant exists: participant id must be the same as shipcall participant map entry
matching_spm_entry = [spm_entry for spm_entry in spm_shipcall_data if (spm_entry.get("participant_id")==user_data["id"]) & (int(spm_entry.get("type"))==int(ParticipantType.AGENCY))]
if len(matching_spm_entry)==0:
# An AGENCY was found, but a different participant_id is assigned to that AGENCY
raise werkzeug.exceptions.Forbidden(f"A different participant_id is assigned as the AGENCY of this shipcall. Provided ID: {user_data.get('id')}, Assigned ShipcallParticipantMap: {agency_entries}") # Forbidden: 403
else:
# a matching agency was found: no violation
return
else:
# agency participant does not exist: there is no assigned agency role for the shipcall {shipcall_id}
raise werkzeug.exceptions.Forbidden(f"There is no assigned agency for this shipcall. Shipcall ID: {loadedModel['id']}") # Forbidden: 403
return
@staticmethod

View File

@ -43,7 +43,7 @@ def get_ship_id_dictionary():
def get_shipcall_id_dictionary():
# get all ships
response,status_code,header = GetShipcalls(token=None)
response,status_code,header = GetShipcalls(options={'past_days':30000})
# build a dictionary of id:item pairs, so one can select the respective participant
shipcalls = json.loads(response)

View File

@ -5,16 +5,23 @@ import jwt
import json
import requests
import datetime
import werkzeug
from marshmallow import ValidationError
from BreCal import local_db
from BreCal.schemas.model import Participant_Assignment, EvaluationType, ShipcallType
from BreCal.stubs.shipcall import create_postman_stub_shipcall, get_stub_valid_shipcall_arrival, get_stub_valid_shipcall_departure, get_stub_valid_shipcall_shifting, get_stub_shipcall_arrival_invalid_missing_eta, get_stub_shipcall_shifting_invalid_missing_eta, get_stub_shipcall_shifting_invalid_missing_etd, get_stub_shipcall_arrival_invalid_missing_type, get_stub_shipcall_departure_invalid_missing_etd
from BreCal.stubs.participant import get_stub_list_of_valid_participants
from BreCal.validators.input_validation import validation_error_default_asserts
from BreCal.schemas.model import ParticipantType
from BreCal.validators.input_validation_shipcall import InputValidationShipcall
@pytest.fixture
def get_stub_token(scope="session"):
instance_path = os.path.join(os.path.expanduser('~'), "brecal", "src", "server", "instance", "instance")
local_db.initPool(os.path.dirname(instance_path), connection_filename="connection_data_local.json")
@pytest.fixture(scope="session")
def get_stub_token():
"""
performs a login to the user 'maxm' and returns the respective url and the token. The token will be used in
further requests in the following format (example of post-request):
@ -35,6 +42,19 @@ def get_stub_token(scope="session"):
token = user.get("token")
return locals()
@pytest.fixture(scope="session")
def get_shipcall_id_after_stub_post_request(get_stub_token):
url, token = get_stub_token["url"], get_stub_token["token"]
post_data = get_stub_valid_shipcall_arrival()
response = requests.post(
f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, json=post_data
)
assert response.status_code==201
shipcall_id = response.json().get("id")
assert shipcall_id is not None
return locals()
def test_shipcall_post_request_fails_when_ship_id_is_invalid(get_stub_token):
url, token = get_stub_token["url"], get_stub_token["token"]
@ -584,3 +604,134 @@ def test_shipcall_post_type_is_wrong(get_stub_token):
)
validation_error_default_asserts(response)
return
def test_shipcall_put_request_fails_when_different_participant_id_is_assigned(get_shipcall_id_after_stub_post_request):
url, token, shipcall_id = get_shipcall_id_after_stub_post_request["token"], get_shipcall_id_after_stub_post_request["url"], get_shipcall_id_after_stub_post_request["shipcall_id"]
post_data = get_stub_valid_shipcall_arrival()
post_data["id"] = shipcall_id
user_data = {'id':6, 'participant_id':1}
loadedModel = post_data
content = post_data
spm_shipcall_data = [{'participant_id': 6, 'type': 4},
{'participant_id': 3, 'type': 1},
{'participant_id': 4, 'type': 2},
{'participant_id': 5, 'type': 8}]
# agency with different participant id is assigned
ivs = InputValidationShipcall()
with pytest.raises(werkzeug.exceptions.Forbidden, match=f"A different participant_id is assigned as the AGENCY of this shipcall. "):
ivs.check_agency_in_shipcall_participant_map(user_data, loadedModel, content, spm_shipcall_data)
return
def test_shipcall_put_request_success(get_shipcall_id_after_stub_post_request):
url, token, shipcall_id = get_shipcall_id_after_stub_post_request["token"], get_shipcall_id_after_stub_post_request["url"], get_shipcall_id_after_stub_post_request["shipcall_id"]
post_data = get_stub_valid_shipcall_arrival()
post_data["id"] = shipcall_id
# success happens, when shipcall data is valid, the user is authorized and the assigned spm shipcall data is suitable
user_data = {'id':6, 'participant_id':1}
loadedModel = post_data
content = post_data
spm_shipcall_data = [{'participant_id': 6, 'type': 8},
{'participant_id': 3, 'type': 1},
{'participant_id': 4, 'type': 2},
{'participant_id': 5, 'type': 4}]
# success
ivs = InputValidationShipcall()
ivs.check_agency_in_shipcall_participant_map(user_data, loadedModel, content, spm_shipcall_data)
return
def test_shipcall_put_request_fails_when_no_agency_is_assigned(get_shipcall_id_after_stub_post_request):
url, token, shipcall_id = get_shipcall_id_after_stub_post_request["token"], get_shipcall_id_after_stub_post_request["url"], get_shipcall_id_after_stub_post_request["shipcall_id"]
post_data = get_stub_valid_shipcall_arrival()
post_data["id"] = shipcall_id
user_data = {'id':6, 'participant_id':1}
loadedModel = post_data
content = post_data
spm_shipcall_data = [
{'participant_id': 3, 'type': 1},
{'participant_id': 4, 'type': 2},
{'participant_id': 5, 'type': 4}]
# no agency assigned
ivs = InputValidationShipcall()
with pytest.raises(werkzeug.exceptions.Forbidden, match=f"There is no assigned agency for this shipcall."):
ivs.check_agency_in_shipcall_participant_map(user_data, loadedModel, content, spm_shipcall_data)
return
def test_shipcall_put_request_fails_when_user_is_not_authorized(get_shipcall_id_after_stub_post_request):
url, token, shipcall_id = get_shipcall_id_after_stub_post_request["token"], get_shipcall_id_after_stub_post_request["url"], get_shipcall_id_after_stub_post_request["shipcall_id"]
post_data = get_stub_valid_shipcall_arrival()
post_data["id"] = shipcall_id
# user '1' is artificially set as participant 2, which has ParticipantType 4 (pilot), and is not authorized as an agency
user_data = {'id':1, 'participant_id':2}
loadedModel = post_data
content = post_data
spm_shipcall_data = [
{'participant_id': 2, 'type': 8},
{'participant_id': 3, 'type': 1},
{'participant_id': 4, 'type': 2},
{'participant_id': 5, 'type': 4}]
# current user is not authorized
ivs = InputValidationShipcall()
with pytest.raises(werkzeug.exceptions.Forbidden, match=f"PUT Requests for shipcalls can only be issued by AGENCY or BSMD users."):
ivs.check_agency_in_shipcall_participant_map(user_data, loadedModel, content, spm_shipcall_data)
return
def test_shipcall_put_request_fails_when_user_tries_self_assignment(get_shipcall_id_after_stub_post_request):
url, token, shipcall_id = get_shipcall_id_after_stub_post_request["token"], get_shipcall_id_after_stub_post_request["url"], get_shipcall_id_after_stub_post_request["shipcall_id"]
post_data = get_stub_valid_shipcall_arrival()
post_data["id"] = shipcall_id
user_data = {'id':1, 'participant_id':6}
loadedModel = post_data
content = post_data
spm_shipcall_data = [{'participant_id': 6, 'type': 8},
{'participant_id': 3, 'type': 1},
{'participant_id': 4, 'type': 2},
{'participant_id': 5, 'type': 4}]
# self-assignment. User is participant 6, and wants to assign participant 6.
ivs = InputValidationShipcall()
with pytest.raises(werkzeug.exceptions.Forbidden, match=f"An agency cannot self-register for a shipcall. The request is issued by an agency-user and tries to assign an AGENCY as the participant of the shipcall."):
ivs.check_agency_in_shipcall_participant_map(user_data, loadedModel, content, spm_shipcall_data)
return
def test_shipcall_put_request_fails_input_validation_shipcall_when_shipcall_is_canceled(get_stub_token):
url, token = get_stub_token["url"], get_stub_token["token"]
# get all shipcalls and grab shipcall with ID 4
# #TODO: there must be a better way to accomplish this easily...
response = requests.get(f"{url}/shipcalls", headers={"Content-Type":"text", "Authorization":f"Bearer {token}"}, params={"past_days":30000})
assert response.status_code==200
assert isinstance(response.json(), list)
shipcalls = response.json()
shipcall_id = 4
sh4 = [sh for sh in shipcalls if sh.get("id")==shipcall_id][0]
put_data = {k:v for k,v in sh4.items() if k in ["eta", "type", "ship_id", "arrival_berth_id", "participants"]}
put_data["id"] = shipcall_id
loadedModel = put_data
content = put_data
# a canceled shipcall cannot be selected
with pytest.raises(ValidationError, match="The shipcall with id 'shipcall_id' is canceled. A canceled shipcall may not be changed."):
InputValidationShipcall.check_shipcall_is_canceled(loadedModel, content)
return