Adding validation rules (traffic light status) to the system. Post & Put calls of shipcalls and times now execute the traffic light evaluation and store the result in the connected MySQL database instance. The 'brecal_utils' library is merged with 'BreCal', including the stub objects and test functions. Requirements were adapted, and installation of a virtual environment works from scratch (on a linux system).

This commit is contained in:
scopesorting 2023-10-17 09:09:35 +02:00 committed by puls200
parent 9f7dd4f55c
commit cdb7877461
55 changed files with 3130 additions and 3 deletions

View File

@ -46,3 +46,23 @@ def create_app(test_config=None):
logging.info('App started')
return app
from BreCal.brecal_utils.file_handling import get_project_root, ensure_path
from BreCal.brecal_utils.test_handling import execute_test_with_pytest, execute_coverage_test
from BreCal.brecal_utils.time_handling import difference_to_then
from BreCal.validators.time_logic import TimeLogic
from BreCal.validators.validation_rules import ValidationRules
from BreCal.validators.schema_validation import validation_state_and_validation_name
__all__ = [
"get_project_root",
"ensure_path",
"execute_test_with_pytest",
"execute_coverage_test",
"difference_to_then",
"TimeLogic",
"ValidationRules",
"validation_state_and_validation_name",
]

View File

View File

@ -0,0 +1,46 @@
import os
def get_project_root(root_base_name:str, root_dir:None=None):
"""
given a {root_base_name}, this function searches the parent folders of {root_dir} until
the basename matches.
Example:
root_base_name = "brecal"
root_dir = "/home/arbitrary_user/brecal/_template/tests"
returns: "/home/arbitrary_user/brecal"
arguments:
root_base_name:str, base directory name that should be searched for
root_dir: defaults to 'None', whereas then the current working directory is selected. Can be an arbitrary path.
returns: root_dir
"""
if root_dir is None:
root_dir = os.getcwd()
assert root_base_name in root_dir, f"the desired base name MUST be present within the root directory.\nRoot Directory: {root_dir}\nDesired Root Base Name: {root_base_name}"
assert root_dir.count(root_base_name)==1, f"found multiple matches for root_base_name" # do not change, as a pytest requires precise wording
while not os.path.basename(root_dir)==root_base_name:
root_dir = os.path.dirname(root_dir)
return root_dir
def ensure_path(path, print_info=0):
"""
Function ensures that a certain directory exists. If it does not exist, it will be created.
It further checks if the parent-directory of the file exists and also ensures that.
options:
print_info: print additional information (debugging)
"""
if not os.path.exists(path):
os.makedirs(path)
if print_info == 1:
print(f"Created directory and subdirectories: {path}")
return

View File

@ -0,0 +1,131 @@
import json
from abc import ABC, abstractmethod
from BreCal.schemas.model import obj_dict
"""implementation of default objects for http request codes. this enforces standardized outputs in the (response, code, headers)-style"""
def get_request_code(code_id):
"""convenience function, which returns the desired request code object"""
request_code_dict = {
200:RequestCode_HTTP_200_OK,
201:RequestCode_HTTP_201_CREATED,
400:RequestCode_HTTP_400_BAD_REQUEST,
403:RequestCode_HTTP_403_FORBIDDEN,
404:RequestCode_HTTP_404_NOT_FOUND,
500:RequestCode_HTTP_500_INTERNAL_SERVER_ERROR
}
assert code_id in list(request_code_dict.keys()), f"unsupported request code: {code_id}. \nAvailable codes: {request_code_dict}"
return request_code_dict.get(code_id)()
class RequestStatusCode(ABC):
def __init__(self):
return
@abstractmethod
def __call__(self, data):
raise NotImplementedError("any default status code object must be callable")
@abstractmethod
def status_code(self):
raise NotImplementedError("any default status code object should return an integer")
@abstractmethod
def response(self, data):
raise NotImplementedError("the response method should return a binary json object. typically, json.dumps is used")
def headers(self):
return {'Content-Type': 'application/json; charset=utf-8'}
class RequestCode_HTTP_200_OK(RequestStatusCode):
def __init__(self) -> None:
super().__init__()
def __call__(self, data):
return (self.response(data), self.status_code(), self.headers())
def status_code(self):
return 200
def response(self, data):
return json.dumps(data, default=obj_dict)
class RequestCode_HTTP_201_CREATED(RequestStatusCode):
def __init__(self) -> None:
super().__init__()
def __call__(self, data):
return (self.response(data), self.status_code(), self.headers())
def status_code(self):
return 201
def response(self, new_id):
return json.dumps({"id":new_id})
class RequestCode_HTTP_400_BAD_REQUEST(RequestStatusCode):
def __init__(self) -> None:
super().__init__()
def __call__(self, data):
return (self.response(data), self.status_code(), self.headers())
def status_code(self):
return 400
def response(self, data):
return json.dumps(data)
class RequestCode_HTTP_403_FORBIDDEN(RequestStatusCode):
def __init__(self) -> None:
super().__init__()
def __call__(self, data):
return (self.response(data), self.status_code(), self.headers())
def status_code(self):
return 403
def response(self, message="invalid credentials"):
result = {}
result["message"] = message
return json.dumps(result)
class RequestCode_HTTP_404_NOT_FOUND(RequestStatusCode):
def __init__(self) -> None:
super().__init__()
def __call__(self, data):
return (self.response(data), self.status_code(), self.headers())
def status_code(self):
return 404
def response(self, message="no such record"):
result = {}
result["message"] = message
return json.dumps(result)
class RequestCode_HTTP_500_INTERNAL_SERVER_ERROR(RequestStatusCode):
def __init__(self) -> None:
super().__init__()
def __call__(self, data):
return (self.response(data), self.status_code(), self.headers())
def status_code(self):
return 500
def response(self, message="credential lookup mismatch"):
result = {}
result["message"] = message
return json.dumps(result)

View File

@ -0,0 +1,84 @@
def execute_test_with_pytest(filepath):
"""
creates a subprocess to use 'pytest' on a script. Every function inside the filepath
will be tested individually. The function returns verbose information about the outcome.
filepath:
can either be an individual .py file or a root directory, which contains multiple files
"""
import os
import pytest
from subprocess import Popen, PIPE
assert os.path.exists(filepath), f"cannot find file {filepath}"
with Popen(['pytest',
'-v',
'-W ignore::DeprecationWarning',
'-vv',
'--durations=0',
'--tb=short', # shorter traceback format
str(filepath)], stdout=PIPE, bufsize=1,
universal_newlines=True) as p:
for line in p.stdout:
print(line, end='')
return
def execute_coverage_test(tests_path, coverage_path, cov_report_dst_dir=None, cov_fail_under_rate=80, is_test=0):
"""
creates a subprocess to use 'coverage' on a script. Every function inside the file
will be tested individually. The function returns verbose information about the outcome.
this function needs two inputs:
tests_path, a path that locates each test that should be executed
e.g.: "/home/scope_sorting/brecal/src/server/tests"
coverage_path, a path where the code is stored, which should be analyzed for coverage
e.g.: "/home/scope_sorting/brecal/src/server/BreCal"
optional:
cov_report_dst_dir, which determines, where the coverage report will be stored. This function then
creates & stores an .html and .xml report in that folder. default: None
cov_fail_under_rate, an integer which determines, when a coverage test should fail. Default: 80, meaning
that at least 80 % of the directory should be tested to pass the test.
"""
import os
import pytest
from subprocess import Popen, PIPE
assert os.path.exists(tests_path), f"cannot find root directory {tests_path}"
assert os.path.exists(coverage_path), f"cannot find root directory {coverage_path}"
if cov_report_dst_dir is not None:
p_open_list_arguments = [
'pytest',
f"{str(tests_path)}",
"-v",
"-vv",
"--durations=0",
"--cov-report=term",
f"--cov-report=html:{cov_report_dst_dir}",
f"--cov-report=xml:{cov_report_dst_dir}/coverage.xml",
f"--cov-fail-under={cov_fail_under_rate}",
f"--cov={str(coverage_path)}",
]
else:
p_open_list_arguments = [
'pytest',
f"{str(tests_path)}",
"-v",
"-vv",
"--durations=0",
"--cov-report=term",
f"--cov-fail-under={cov_fail_under_rate}",
f"--cov={str(coverage_path)}",
]
with Popen(p_open_list_arguments, stdout=PIPE, bufsize=1,
universal_newlines=True) as p:
for line in p.stdout:
print(line, end='')
if is_test:
raise KeyboardInterrupt("is_test_interrupt")

View File

@ -0,0 +1,21 @@
import datetime
def difference_to_then(event_time, tgt_time=None, make_absolute=False):
"""
measures the difference between {tgt_time} and {event_time}. this function automatically converts the datetime.timedelta object to seconds.
tgt_time defaults to {now}, if it is not specified.
Note: using divmod(time_diff, interval_duration) may be interesting to determine, how many units of {interval_duration} have passed.
e.g.,
divmod(time_diff, 3600) returns a float of hours. This will then return a tuple
options:
make_absolute: bool. Whether to return an absolute difference
Returns: time_diff (float)
"""
tgt_time = tgt_time or datetime.datetime.now()
time_diff = tgt_time - event_time
if make_absolute:
return abs(time_diff.total_seconds())
return time_diff.total_seconds()

View File

View File

@ -0,0 +1,38 @@
from enum import Enum
class ParticipantType(Enum):
"""determines the type of a participant"""
NONE = 0
BSMD = 1
TERMINAL = 2
PILOT = 4
AGENCY = 8
MOORING = 16
PORT_ADMINISTRATION = 32
TUG = 64
class ShipcallType(Enum):
"""determines the type of a shipcall, as this changes the applicable validation rules"""
INCOMING = 1
OUTGOING = 2
SHIFTING = 3
class ParticipantwiseTimeDelta():
"""stores the time delta for every participant, which triggers the validation rules in the rule set '0001'"""
AGENCY = 1200.0 # 20 h * 60 min/h = 1200 min
MOORING = 960.0 # 16 h * 60 min/h = 960 min
PILOT = 960.0 # 16 h * 60 min/h = 960 min
PORT_ADMINISTRATION = 960.0 # 16 h * 60 min/h = 960 min
TUG = 960.0 # 16 h * 60 min/h = 960 min
TERMINAL = 960.0 # 16 h * 60 min/h = 960 min
class StatusFlags(Enum):
"""
these enumerators ensure that each traffic light validation rule state corresponds to a value, which will be used in the ValidationRules object to identify
the necessity of notifications.
"""
NONE = 0
GREEN = 1
YELLOW = 2
RED = 3

View File

@ -0,0 +1,204 @@
import numpy as np
import pandas as pd
import datetime
from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times
from BreCal.database.enums import ParticipantType
def pandas_series_to_data_model():
return
class SQLHandler():
"""
An object that reads SQL queries from the sql_connection and stores it in pandas DataFrames. The object can read all available tables
at once into memory, when providing 'read_all=True'.
# #TODO_initialization: shipcall_tug_map, user_role_map & role_securable_map might be mapped to the respective dataframes
"""
def __init__(self, sql_connection, read_all=False):
self.sql_connection = sql_connection
self.all_schemas = self.get_all_schemas_from_mysql()
self.build_str_to_model_dict()
if read_all:
self.read_all(self.all_schemas)
def get_all_schemas_from_mysql(self):
with self.sql_connection.cursor(buffered=True) as cursor:
cursor.execute("SHOW TABLES")
schema = cursor.fetchall()
all_schemas = [schem[0] for schem in schema]
return all_schemas
def build_str_to_model_dict(self):
"""
creates a simple dictionary, which maps a string to a data object
e.g.,
'ship'->BreCal.schemas.model.Ship object
"""
self.str_to_model_dict = {
"shipcall":Shipcall, "ship":Ship, "participant":Participant, "berth":Berth, "user":User, "times":Times
}
return
def read_mysql_table_to_df(self, table_name:str):
"""determine a {table_name}, which will be read from a mysql server. returns a pandas DataFrame with the respective data"""
df = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=self.sql_connection)
return df
def mysql_to_df(self, query):
"""provide an arbitrary sql query that should be read from a mysql server {sql_connection}. returns a pandas DataFrame with the obtained data"""
df = pd.read_sql(query, self.sql_connection).convert_dtypes()
df = df.set_index('id', inplace=False) # avoid inplace updates, so the raw sql remains unchanged
return df
def read_all(self, all_schemas):
# create a dictionary, which maps every mysql schema to pandas DataFrames
self.df_dict = self.build_full_mysql_df_dict(all_schemas)
# update the 'participants' column in 'shipcall'
self.initialize_shipcall_participant_list()
return
def build_full_mysql_df_dict(self, all_schemas):
"""given a list of strings {all_schemas}, every schema will be read as individual pandas DataFrames to a dictionary with the respective keys. returns: dictionary {schema_name:pd.DataFrame}"""
mysql_df_dict = {}
for schem in all_schemas:
query = f"SELECT * FROM {schem}"
mysql_df_dict[schem] = self.mysql_to_df(query)
return mysql_df_dict
def initialize_shipcall_participant_list(self):
"""
iteratively applies the .get_participants method to each shipcall.
the function updates the 'participants' column.
"""
# 1.) get all shipcalls
df = self.df_dict.get('shipcall')
# 2.) iterate over each individual shipcall, obtain the id (pandas calls it 'name')
# and apply the 'get_participants' method, which returns a list
# if the shipcall_id exists, the list contains ids
# otherwise, return a blank list
df['participants'] = df.apply(
lambda x: self.get_participants(x.name),
axis=1)
return
def standardize_model_str(self, model_str:str)->str:
"""check if the 'model_str' is valid and apply lowercasing to the string"""
model_str = model_str.lower()
assert model_str in list(self.df_dict.keys()), f"cannot find the requested 'model_str' in mysql: {model_str}"
return model_str
def get_data(self, id:int, model_str:str):
"""
obtains {id} from the respective mysql database and builds a data model from that.
the id should match the 'id'-column in the mysql schema.
returns: data model, such as Ship, Shipcall, etc.
e.g.,
data = self.get_data(0,"shipcall")
returns a Shipcall object
"""
model_str = self.standardize_model_str(model_str)
df = self.df_dict.get(model_str)
data = self.df_loc_to_data_model(df, id, model_str)
return data
def get_all(self, model_str:str)->list:
"""
given a model string (e.g., 'shipcall'), return a list of all
data models of that type from the sql
"""
model_str = self.standardize_model_str(model_str)
all_ids = self.df_dict.get(model_str).index
all_data = [
self.get_data(_aid, model_str)
for _aid in all_ids
]
return all_data
def df_loc_to_data_model(self, df, id, model_str, loc_type:str="loc"):
assert len(df)>0, f"empty dataframe"
# get a pandas series from the dataframe
series = df.loc[id] if loc_type=="loc" else df.iloc[id]
# get the respective data model object
data_model = self.str_to_model_dict.get(model_str,None)
assert data_model is not None, f"could not find the requested model_str: {model_str}"
# build 'data' and fill the data model object
# convert the 'id' to an integer, so the np.uint64 (used by pandas) is convertible to mysql
data = {**{'id':int(id)}, **series.to_dict()} # 'id' must be added manually, as .to_dict does not contain the index, which was set with .set_index
data = data_model(**data)
return data
def get_times_for_participant_type(self, df_times, participant_type:int):
filtered_series = df_times.loc[df_times["participant_type"]==participant_type]
assert len(filtered_series)<=1, f"found multiple results"
times = self.df_loc_to_data_model(filtered_series, id=0, model_str='times', loc_type="iloc") # use iloc! to retrieve the first result
return times
def dataframe_to_data_model_list(self, df, model_str)->list:
model_str = self.standardize_model_str(model_str)
all_ids = df.index
all_data = [
self.df_loc_to_data_model(df, _aid, model_str)
for _aid in all_ids
]
return all_data
def get_participants(self, shipcall_id:id)->list:
"""
given a {shipcall_id}, obtain the respective list of participants.
when there are no participants, return a blank list
returns: participant_id_list, where every element is an int
"""
df = self.df_dict.get("shipcall_participant_map")
df = df.set_index('shipcall_id', inplace=False)
# the 'if' call is needed to ensure, that no Exception is raised, when the shipcall_id is not present in the df
participant_id_list = df.loc[shipcall_id, "participant_id"].to_list() if shipcall_id in list(df.index) else []
return participant_id_list
def get_times_of_shipcall(self, shipcall)->pd.DataFrame:
df_times = self.df_dict.get('times') # -> pd.DataFrame
df_times = df_times.loc[df_times["shipcall_id"]==shipcall.id]
return df_times
def get_times_for_agency(self, non_null_column=None)->pd.DataFrame:
"""
options:
non_null_column:
None or str. If provided, the 'non_null_column'-column of the dataframe will be filtered,
so only entries with provided values are returned (filters all NaN and NaT entries)
"""
# get all times
df_times = self.df_dict.get('times') # -> pd.DataFrame
# filter out all NaN and NaT entries
if non_null_column is not None:
df_times = df_times.loc[~df_times[non_null_column].isnull()] # NOT null filter
# filter by the agency participant_type
times_agency = df_times.loc[df_times["participant_type"]==ParticipantType.AGENCY.value]
return times_agency
def filter_df_by_key_value(self, df, key, value)->pd.DataFrame:
return df.loc[df[key]==value]
def get_unique_ship_counts(self, all_df_times:pd.DataFrame, query:str, rounding:str="min", maximum_threshold=3):
"""given a dataframe of all agency times, get all unique ship counts, their values (datetime) and the string tags. returns a tuple (values,unique,counts)"""
# get values and optional: rounding
values = all_df_times.loc[:, query]
if rounding is not None:
values = values.dt.round(rounding) # e.g., 'min'
unique, counts = np.unique(values, return_counts=True)
violation_state = np.any(np.greater(counts, maximum_threshold))
return (values, unique, counts)

View File

@ -0,0 +1,103 @@
import json
import pydapper
import pandas as pd
import mysql.connector
from BreCal.database.sql_handler import SQLHandler
from BreCal.validators.validation_rules import ValidationRules
from BreCal.schemas.model import Shipcall
def update_shipcall_in_mysql_database(sql_connection, shipcall:Shipcall, relevant_keys:list = ["evaluation", "evaluation_message"]):
"""
given an individual schemaModel (e.g., Shipcall.__dict__), update the entry within the mysql database
options:
sql_connection: an instance of mysql.connector.connect
shipcall: a Shipcall data model
relevant_keys: a list of the keys to be updated. Should never contain 'id', which is immutable
returns: (query, affected_rows)
"""
assert not "id" in relevant_keys, f"the 'id' key should never be updated."
schemaModel = shipcall.__dict__
commands = pydapper.using(sql_connection)
sentinel = object()
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
query = build_mysql_query_to_update_shipcall(shipcall=shipcall, relevant_keys=relevant_keys)
affected_rows = commands.execute(query, param=schemaModel)
return (query, affected_rows)
def build_mysql_query_to_update_shipcall(shipcall, relevant_keys:list):
"""builds a mysql query, which updates the shipcall table. In particular, the provided shipcall will be updated for each key in {relevant_keys}"""
schemaModel = shipcall.__dict__
# prepare prefix and suffix. Then build the body of the query
prefix = "UPDATE shipcall SET "
suffix = "where id = ?id?"
body = ", ".join([f"{key} = ?{key}? " for key in schemaModel.keys() if (key in relevant_keys)]) # .join ignores the first ', ', which equals the 'isNotFirst' boolean-loop
# build query
query = f"{prefix}{body}{suffix}"
return query
def update_all_shipcalls_in_mysql_database(sql_connection, sql_handler:SQLHandler, shipcall_df:pd.DataFrame)->None:
"""
iterates over each shipcall_id in a shipcall dataframe, builds Shipcall data models and updates those in the sql database, which
is located in {sql_connection}
options:
sql_connection: an instance of mysql.connector.connect
sql_handler: an SQLHandler instance
shipcall_df: dataframe, which stores the data that is used to retrieve the shipcall data models (that are then updated in the database)
"""
for shipcall_id in shipcall_df.index:
shipcall = sql_handler.df_loc_to_data_model(df=shipcall_df, id=shipcall_id, model_str="shipcall")
update_shipcall_in_mysql_database(sql_connection, shipcall=shipcall, relevant_keys = ["evaluation", "evaluation_message"])
return
def run_validation_rules(mysql_connector_instance, shipcall_id:int=None, debug=False)->pd.DataFrame:
"""
options:
mysql_connector_instance: an instance created by the mysql.connector.connect() call. It is advised to use Python's context manager to close the connection after finished.
e.g.,
with mysql.connector.connect(**mysql_connection_data) as mysql_connector_instance:
run_validation_rules(mysql_connector_instance)
returns None
"""
sql_handler = SQLHandler(sql_connection=mysql_connector_instance, read_all=True)
vr = ValidationRules(sql_handler)
shipcall_df = sql_handler.df_dict.get("shipcall")
if shipcall_id is not None:
shipcall_df = shipcall_df.loc[[shipcall_id]]
# placeholder: filter shipcalls. For example, exclude historic entries.
shipcall_df = vr.evaluate_shipcalls(shipcall_df)
if debug:
return shipcall_df
# iterate over each shipcall in shipcall_df and update the respective entry in the mysql database
update_all_shipcalls_in_mysql_database(sql_connection=mysql_connector_instance, sql_handler=sql_handler, shipcall_df=shipcall_df)
return shipcall_df
def update_shipcall_evaluation_state(mysql_connection_data:dict, shipcall_id:int=None)->pd.DataFrame:
"""
single line function to connect to a mysql database (using the {mysql_connection_data}), evaluate each shipcall (bei traffic state)
and finally, update those in the database.
options:
mysql_connection_data: connection data to the mysql database (e.g., port, host, password)
shipcall_id: int. ID of the shipcall to be updated. Defaults to 'None'. When providing 'None', all shipcalls are updated.
"""
with mysql.connector.connect(**mysql_connection_data) as mysql_connector_instance:
shipcall_df = run_validation_rules(mysql_connector_instance=mysql_connector_instance, shipcall_id=shipcall_id, debug=False)
return shipcall_df

View File

@ -6,6 +6,8 @@ import pydapper
from ..schemas import model
from .. import local_db
from BreCal.database.update_database import evaluate_shipcall_state
def GetShipcalls(options):
"""
No parameters, gets all entries
@ -102,6 +104,9 @@ def PostShipcalls(schemaModel):
for participant_assignment in schemaModel["participants"]:
commands.execute(pquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]})
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=new_id) # new_id (last insert id) refers to the shipcall id
pooledConnection.close()
return json.dumps({"id" : new_id}), 201, {'Content-Type': 'application/json; charset=utf-8'}
@ -183,6 +188,9 @@ def PutShipcalls(schemaModel):
dquery = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?"
commands.execute(dquery, param={"existing_id" : elem["id"]})
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=schemaModel["id"]) # schemaModel["id"] refers to the shipcall id
pooledConnection.close()
return json.dumps({"id" : schemaModel["id"]}), 200

View File

@ -5,6 +5,8 @@ import pydapper
from ..schemas import model
from .. import local_db
from BreCal.database.update_database import evaluate_shipcall_state
def GetTimes(options):
"""
:param options: A dictionary containing all the paramters for the Operations
@ -80,6 +82,9 @@ def PostTimes(schemaModel):
commands.execute(query, schemaModel)
new_id = commands.execute_scalar("select last_insert_id()")
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database 'shipcall'
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=schemaModel["shipcall_id"]) # every times data object refers to the 'shipcall_id'
pooledConnection.close()
return json.dumps({"id" : new_id}), 201, {'Content-Type': 'application/json; charset=utf-8'}
@ -122,6 +127,9 @@ def PutTimes(schemaModel):
affected_rows = commands.execute(query, param=schemaModel)
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database 'shipcall'
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=schemaModel["shipcall_id"]) # every times data object refers to the 'shipcall_id'
pooledConnection.close()
# if affected_rows == 1: # this doesn't work as expected

View File

View File

View File

@ -0,0 +1,5 @@
def generate_uuid1_int():
"""# TODO: clarify, what kind of integer ID is used in mysql. Generates a proxy ID, which is used in the stubs"""
from uuid import uuid1
return uuid1().int>>64

View File

@ -0,0 +1,31 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Berth
def get_berth_simple():
berth_id = generate_uuid1_int() # uid?
# Note: #TODO: name, participant_id & lock state are arbitrary
name = "Avangard Dalben"
participant_id = 1# e.g., Avangard
lock = False
owner_id = 1 # e.g., Avangard
authority_id = 1 # e.g., Avangard
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
deleted = modified+datetime.timedelta(seconds=3)
berth = Berth(
berth_id,
name,
participant_id,
lock,
owner_id,
authority_id,
created,
modified,
deleted,
)
return berth

View File

@ -0,0 +1,49 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Notification
def get_notification_simple():
"""creates a default notification, where 'created' is now, and modified is now+10 seconds"""
notification_id = generate_uuid1_int() # uid?
times_id = generate_uuid1_int() # uid?
acknowledged = False
level = 10
type = 0
message = "hello world"
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
notification = Notification(
notification_id,
times_id,
acknowledged,
level,
type,
message,
created,
modified
)
return notification
def get_notification_in_the_past(created_delta_seconds, modified_delta_seconds, acknowledged=False):
"""
creates a notification of the past, where the
'created' date is {created_delta_seconds} seconds ago
'modified' date is {modified_delta_seconds} seconds ago
for example, if datetime.datetime.now() returns
now = datetime.datetime(2023, 9, 15, 7, 25, 50, 733644)), then calling this function
as get_notification_modified_in_the_past(2*60, 1*60) provides
'created':datetime.datetime(2023, 9, 15, 7, 23, 50, 733644) (two minutes ago)
'modified':datetime.datetime(2023, 9, 15, 7, 24, 50, 733644) (one minute ago)
optionally, one can also overwrite the 'acknowledged' attribute
returns notification
"""
notification = get_notification_simple()
notification.created = datetime.datetime.now()-datetime.timedelta(seconds=created_delta_seconds)
notification.modified = datetime.datetime.now()-datetime.timedelta(seconds=modified_delta_seconds)
notification.acknowledged = acknowledged
return notification

View File

@ -0,0 +1,32 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Participant
def get_participant_simple():
participant_id = generate_uuid1_int()
# #TODO: role_type and flags are arbitrary
name = "Max Mustermann"
street = "Musterstrasse 1"
postal_code = "12345"
city = "Bremen"
role_type = 1 # integer
flags = 0 # integer. unclear
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
deleted = modified+datetime.timedelta(seconds=3)
participant = Participant(
participant_id,
name,
street,
postal_code,
city,
role_type,
flags,
created,
modified,
deleted
)
return participant

View File

View File

@ -0,0 +1,38 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Ship
def get_ship_simple():
ship_id = generate_uuid1_int()
name = "african halcyon".upper() # 'Schiffe_sample_format.xlsx' uses .upper() for every ship
imo = 9343613 # assert str(len(imo))==7
callsign = 1234567 # up to 7 characters. assert str(len(callsign))<=7
participant_id = generate_uuid1_int()
length = 177.13 # assert 0>length<=500
width = 28.4 # assert 0>width<=500
is_tug = False
bollard_pull = None # only if is_tug
participant_id = None # only if is_tug
eni = "01234567" # Alternative to IMO. Dynamic assertion? assert len(str(eni))==8
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
deleted = modified+datetime.timedelta(seconds=3)
ship = Ship(
ship_id,
name,
imo,
callsign,
participant_id,
length,
width,
is_tug,
bollard_pull,
eni,
created,
modified,
deleted
)
return ship

View File

@ -0,0 +1,81 @@
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Shipcall
from dataclasses import field
def get_shipcall_simple():
# only used for the stub
base_time = datetime.datetime.now()
shipcall_id = generate_uuid1_int()
ship_id = generate_uuid1_int()
eta = base_time+datetime.timedelta(hours=3, minutes=12)
role_type = 1
voyage = "987654321"
etd = base_time+datetime.timedelta(hours=6, minutes=12) # should never be before eta
arrival_berth_id = generate_uuid1_int()
departure_berth_id = generate_uuid1_int()
tug_required = False
pilot_required = False
flags = 0 # #TODO_shipcall_flags. What is meant here? What should be tested?
pier_side = False # whether a ship will be fixated on the pier side. en: pier side, de: Anlegestelle. From 'BremenCalling_Datenmodell.xlsx': gedreht/ungedreht
bunkering = False # #TODO_bunkering_unclear
replenishing_terminal = False # en: replenishing terminal, de: Nachfüll-Liegeplatz
replenishing_lock = False # en: replenishing lock, de: Nachfüllschleuse
draft = 0.12 # #TODO_draft_value: clarify, what 'draft' means and what kind of values are to be expected
# tidal window: built in a way, where ETA and ETD are in-between the window
# #TODO_tidal_window_source: are these windows taken from a database or provided by the user? How do they know this?
tidal_window_from = base_time+datetime.timedelta(hours=2, minutes=12)
tidal_window_to = base_time+datetime.timedelta(hours=7, minutes=12)
rain_sensitive_cargo = False
recommended_tugs = 2 # assert 0<recommended_tugs<={threshold}. E.g., 20 should not be exceeded.
anchored = False
moored_lock = False # de: 'Festmacherschleuse', en: 'moored lock'
canceled = False
evaluation = None
evaluation_message = ""
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
participants = [generate_uuid1_int(), generate_uuid1_int(), generate_uuid1_int(), generate_uuid1_int()] # field(default_factory=[generate_uuid1_int(), generate_uuid1_int(), generate_uuid1_int(), generate_uuid1_int()]) # list
shipcall = Shipcall(
shipcall_id,
ship_id,
role_type,
eta,
voyage,
etd,
arrival_berth_id,
departure_berth_id,
tug_required,
pilot_required,
flags,
pier_side,
bunkering,
replenishing_terminal,
replenishing_lock,
draft,
tidal_window_from,
tidal_window_to,
rain_sensitive_cargo,
recommended_tugs,
anchored,
moored_lock,
canceled,
evaluation,
evaluation_message,
created,
modified,
participants,
)
return shipcall

View File

View File

@ -0,0 +1,67 @@
"""
this stub creates an example time object, where the times of every role are present.
users will thereby be able to modify these values
"""
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import Times
def get_times_full_simple():
# only used for the stub
base_time = datetime.datetime.now()
# note 1: eta and etd should never individually be in reverse order. assert etd>eta (for berth) & lock
# note 2: times are currently computed as a sequence of (eta_berth -> lock_time -> etd_berth -> zone_entry). The deltas are arbitrary
times_id = generate_uuid1_int()
eta_berth = base_time+datetime.timedelta(hours=1, minutes=12)
eta_berth_fixed = False
lock_time = eta_berth+datetime.timedelta(hours=0, minutes=50)
lock_time_fixed = False
etd_berth = lock_time+datetime.timedelta(hours=0, minutes=45)
etd_berth_fixed = False
zone_entry = etd_berth+datetime.timedelta(hours=0, minutes=15)
zone_entry_fixed = False
operations_start = zone_entry+datetime.timedelta(hours=1, minutes=30)
operations_end = operations_start+datetime.timedelta(hours=4, minutes=30)
remarks = "" # assert len(remarks)<{max_len_threshold}
participant_id = generate_uuid1_int()
shipcall_id = generate_uuid1_int()
berth_id = generate_uuid1_int()
berth_info = ""
pier_side = True
participant_type = None
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
times = Times(
id=times_id,
eta_berth=eta_berth,
eta_berth_fixed=eta_berth_fixed,
etd_berth=etd_berth,
etd_berth_fixed=etd_berth_fixed,
lock_time=lock_time,
lock_time_fixed=lock_time_fixed,
zone_entry=zone_entry,
zone_entry_fixed=zone_entry_fixed,
operations_start=operations_start,
operations_end=operations_end,
remarks=remarks,
participant_id=participant_id,
berth_id=berth_id,
berth_info=berth_info,
pier_side=pier_side,
participant_type=participant_type,
shipcall_id=shipcall_id,
created=created,
modified=modified,
)
return times

View File

View File

View File

@ -0,0 +1,35 @@
import bcrypt
import datetime
from BreCal.stubs import generate_uuid1_int
from BreCal.schemas.model import User
def get_user_simple():
user_id = generate_uuid1_int()
participant_id = generate_uuid1_int() # should be taken from the database
first_name = "Max"
last_name = "Mustermann"
user_name = "maxm123"
user_email = "max.mustermann@brecal.de"
user_phone = "0173123456" # formatting?
password_hash = bcrypt.hashpw("123456".encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
api_key = bcrypt.hashpw("apikey123".encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
created = datetime.datetime.now()
modified = created+datetime.timedelta(seconds=10)
user = User(
user_id,
participant_id,
first_name,
last_name,
user_name,
user_email,
user_phone,
password_hash,
api_key,
created,
modified
)
return user

View File

View File

@ -0,0 +1,165 @@
####################################### InputValidation #######################################
from abc import ABC, abstractmethod
from BreCal.schemas.model import Ship, Shipcall, Berth, User, Participant
class InputValidation():
def __init__(self):
self.build_supported_models_dictionary()
return
def build_supported_models_dictionary(self):
self.supported_models = {
Ship:ShipValidation(),
Shipcall:ShipcallValidation(),
Berth:BerthValidation(),
User:UserValidation(),
Participant:ParticipantValidation(),
}
return
def assert_if_not_supported(self, dataclass_object):
assert type(dataclass_object) in self.supported_models, f"unsupported model. Found: {type(dataclass_object)}"
return
def verify(self, dataclass_object):
self.assert_if_not_supported(dataclass_object)
# determine the type of the dataclass object. The internal dictionary 'supported_models' matches the dataclass object
# to the respective validation protocol
validator = self.supported_models.get(type(dataclass_object))
# check the object based on the rules within the matched validator
input_validation_state = validator.check(dataclass_object)
return input_validation_state
class DataclassValidation(ABC):
"""parent class of dataclas validators, which determines the outline of every object"""
def __init__(self):
return
def check(self, dataclass_object) -> (list, bool):
"""
the 'check' method provides a default style, how each dataclass object is validated. It returns a list of violations
and a boolean, which determines, whether the check is passed successfully
"""
all_rules = self.apply_all_rules(dataclass_object)
violations = self.filter_violations(all_rules)
input_validation_state = self.evaluate(violations)
return (violations, input_validation_state)
@abstractmethod
def apply_all_rules(self, dataclass_object) -> list:
"""
the 'apply_all_rules' method is mandatory for any dataclass validation object. It should execute all validation rules and
return a list of tuples, where each element is (output_boolean, validation_name)
"""
all_rules = [(True, 'blank_validation_rule')]
return all_rules
def filter_violations(self, all_rules):
"""input: all_rules, a list of tuples, where each element is (output, validation_name), which are (bool, str). """
# if output is False, a violation is observed
violations = [result[1] for result in all_rules if not result[0]]
return violations
def evaluate(self, violations) -> bool:
input_validation_state = len(violations)==0
return input_validation_state
class ShipcallValidation(DataclassValidation):
"""an object that validates a Shipcall dataclass object"""
def __init__(self):
super().__init__()
return
def apply_all_rules(self, dataclass_object) -> list:
"""apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
raise NotImplementedError()
return all_rules
from BreCal.validators.schema_validation import ship_bollard_pull_is_defined_or_is_not_tug, ship_bollard_pull_is_none_or_in_range, ship_callsign_len_is_seven_at_maximum, ship_eni_len_is_eight, ship_imo_len_is_seven, ship_length_in_range, ship_participant_id_is_defined_or_is_not_tug, ship_participant_id_is_none_or_int, ship_width_in_range
# skip: ship_max_draft_is_defined_or_is_not_tug, ship_max_draft_is_none_or_in_range,
class ShipValidation(DataclassValidation):
"""an object that validates a Ship dataclass object"""
def __init__(self):
super().__init__()
return
def apply_all_rules(self, dataclass_object) -> list:
"""apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
# skip: ship_max_draft_is_defined_or_is_not_tug, ship_max_draft_is_none_or_in_range,
"""
#TODO_ship_max_draft
with pytest.raises(AttributeError, match="'Ship' object has no attribute 'max_draft'"):
assert ship_max_draft_in_range(ship)[0], f"max draft of a ship must be between 0 and 20 meters"
assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft should either be undefined or between 0 and 20 meters"
"""
# list comprehension: every function becomes part of the loop and will be executed. Each function is wrapped and provides (output, validation_name)
all_rules = [
# tuple: (output, validation_name)
check_rule(dataclass_object)
for check_rule in [
ship_bollard_pull_is_defined_or_is_not_tug,
ship_bollard_pull_is_none_or_in_range,
ship_callsign_len_is_seven_at_maximum,
ship_eni_len_is_eight,
ship_imo_len_is_seven,
ship_length_in_range,
ship_participant_id_is_defined_or_is_not_tug,
ship_participant_id_is_none_or_int,
ship_width_in_range
]
]
return all_rules
class BerthValidation(DataclassValidation):
"""an object that validates a Berth dataclass object"""
def __init__(self):
super().__init__()
return
def apply_all_rules(self, dataclass_object) -> list:
"""apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
raise NotImplementedError()
return all_rules
class UserValidation(DataclassValidation):
"""an object that validates a User dataclass object"""
def __init__(self):
super().__init__()
return
def apply_all_rules(self, dataclass_object) -> list:
"""apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
raise NotImplementedError()
return all_rules
from BreCal.validators.schema_validation import participant_postal_code_len_is_five
class ParticipantValidation(DataclassValidation):
"""an object that validates a Participant dataclass object"""
def __init__(self):
super().__init__()
return
def apply_all_rules(self, dataclass_object) -> list:
"""apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
# list comprehension: every function becomes part of the loop and will be executed. Each function is wrapped and provides (output, validation_name)
all_rules = [
# tuple: (output, validation_name)
check_rule(dataclass_object)
for check_rule in [
participant_postal_code_len_is_five,
]
]
return all_rules

View File

@ -0,0 +1,139 @@
# wrapper: every validation function returns a tuple of (validation_state, validation_name)
# example: validate_ship_eni_length might return the tuple (True, 'ship_eni_length')
# thereby, one could always know, which test causes an issue
####################################### general functions #######################################
def validation_state_and_validation_name(validation_name):
"""
can wrap arbitrary functions, so they return (output, validation_name)-tuples
usage example:
@validation_state_and_validation_name("ship_eni_length")
def validate_ship_eni_length(ship):
return length_matches_exactly(ship.eni,8)
"""
def wrapper(validation_fct):
def decorated_fct(*args, **kwargs):
return (validation_fct(*args, **kwargs), validation_name)
return decorated_fct
return wrapper
def value_in_range(query_value, start_range, end_range):
"""determines, whether the query_value is greater than start_range, but smaller than end_range. Returns bool"""
return start_range<query_value<end_range
def length_is_at_maximum(query_value, max_len):
"""determines, whether the query_value's length is l<={max_len}. Returns bool"""
return len(str(query_value))<=max_len
def length_matches_exactly(query_value, length_value):
"""determines, whether the query_value's length is exactly l=={length_value}. Returns bool"""
return len(str(query_value)) == length_value
####################################### dataclass specifics #######################################
### Ship dataclass (BreCal.schema.model.Ship) ###
@validation_state_and_validation_name("ship_bollard_pull_none_or_value_in_range")
def ship_bollard_pull_is_none_or_in_range(ship):
"""a ship should either have its bollard_pull between 0 and 500, or have an undefined bollard_pull (when not a tug)"""
return (ship.bollard_pull is None) or (value_in_range(ship.bollard_pull, 0, 500))
@validation_state_and_validation_name("ship_max_draft_none_or_value_in_range")
def ship_max_draft_is_none_or_in_range(ship):
"""a ship should either have its max_draft between 0 and 20, or have an undefined max_draft (when not a tug)"""
return (ship.max_draft is None) or (value_in_range(ship.max_draft, 0, 20))
@validation_state_and_validation_name("ship_participant_id_none_or_int")
def ship_participant_id_is_none_or_int(ship):
"""a ship should either have its participant_id defined (integer when ship is a tug), or have an undefined participant_id (when not a tug)"""
return isinstance(ship.participant_id, int) or (ship.participant_id is None)
@validation_state_and_validation_name("ship_length")
def ship_length_in_range(ship):
"""ship length-values should be valid. between 0 and 500 meters is plausible. returns bool"""
return value_in_range(ship.length, 0, 500)
@validation_state_and_validation_name("ship_width")
def ship_width_in_range(ship):
"""ship length-values should be valid. between 0 and 500 meters is plausible. returns bool"""
return value_in_range(ship.width, 0, 500)
@validation_state_and_validation_name("ship_eni_length")
def ship_eni_len_is_eight(ship):
"""eni-no. are standardized. They should have exactly eight characters. returns bool"""
return length_matches_exactly(ship.eni,8)
@validation_state_and_validation_name("ship_imo_length")
def ship_imo_len_is_seven(ship):
"""IMO-numbers are standardized. They should have exactly seven characters. returns bool"""
return length_matches_exactly(ship.imo,7)
@validation_state_and_validation_name("ship_callsign_length")
def ship_callsign_len_is_seven_at_maximum(ship):
"""the ship's callsign should have l<=7 characters. returns bool"""
return length_is_at_maximum(ship.callsign, 7)
def ship_is_not_tug_or_key_is_defined(is_tug, key_):
""" # base function
function that checks, if a Ship dataclass is either
a) not a tug
b) has a defined value of {key_}
can be used for max_draft, participant_id and bollard_pull
"""
return (not is_tug) or (key_ is not None)
@validation_state_and_validation_name("ship_bollard_pull_dynamically_mandatory")
def ship_bollard_pull_is_defined_or_is_not_tug(ship):
"""
there are two valid cases for the bollard_pull:
a) bollard_pull is undefined (None), if the ship is not a tug
b) bollard_pull is defined, if the ship is a tug
if the ship is a tug, a separate function validates in addition, if the value is in an accepted range
returns bool
"""
return ship_is_not_tug_or_key_is_defined(ship.is_tug, ship.bollard_pull)
@validation_state_and_validation_name("ship_max_draft_dynamically_mandatory")
def ship_max_draft_is_defined_or_is_not_tug(ship):
"""
there are two valid cases for the max_draft:
a) max_draft is undefined (None), if the ship is not a tug
b) max_draft is defined, if the ship is a tug
if the ship is a tug, a separate function validates in addition, if the value is in an accepted range
returns bool
"""
return ship_is_not_tug_or_key_is_defined(ship.is_tug, ship.max_draft)
# #TODO_ship_tug_participant_id: is this semantically correct? Will the participant_id be entered or automatically filled?
@validation_state_and_validation_name("ship_max_draft_dynamically_mandatory")
def ship_participant_id_is_defined_or_is_not_tug(ship):
"""
there are two valid cases for the max_draft:
a) participant_id is undefined (None), if the ship is not a tug
b) participant_id is defined, if the ship is a tug
returns bool
"""
return ship_is_not_tug_or_key_is_defined(ship.is_tug, ship.participant_id)
### Participant dataclass (BreCal.schema.model.Participant) ###
@validation_state_and_validation_name("participant_postal_code_length")
def participant_postal_code_len_is_five(participant):
"""
validates, that a postal code has 5 characters. returns bool
# #TODO_postal_code_length_validation: might make sense to request postal_code<=5 characters
# is the 5-character requirement true when international ships arive?
"""
return length_matches_exactly(participant.postal_code, 5)

View File

@ -0,0 +1,91 @@
import datetime
import numpy as np
import pandas as pd
class TimeLogic():
def __init__(self):
return
def time_delta(self, src_time, tgt_time, unit:str="m"):
"""
in brief, this function measures tgt_time - src_time
if the tgt_time is in the future, it is a positive value (tgt_time > src_time)
if the tgt_time is in the past, it is a negative value (tgt_time < src_time)
returns the delta between tgt_time and src_time as a float of minutes (or the optionally provided unit)
options:
unit: str, which defaults to 'm' (minutes). 'h' (hours) or 's' (seconds) are also common units. Determines the unit of the output time delta
"""
# convert np.datetime64
if isinstance(src_time, pd.Timestamp):
src_time = src_time.to_datetime64()
if isinstance(tgt_time, pd.Timestamp):
tgt_time = tgt_time.to_datetime64()
if isinstance(src_time, datetime.datetime):
src_time = np.datetime64(src_time)
if isinstance(tgt_time, datetime.datetime):
tgt_time = np.datetime64(tgt_time)
delta = tgt_time - src_time
minute_delta = delta / np.timedelta64(1, unit)
return minute_delta
def time_delta_from_now_to_tgt(self, tgt_time, unit="m"):
return self.time_delta(datetime.datetime.now(), tgt_time=tgt_time, unit=unit)
def time_inbetween(self, query_time:datetime.datetime, start_time:datetime.datetime, end_time:datetime.datetime) -> bool:
"""
checks, whether the query time is inbetween the start & end time. Returns a bool to indicate that.
Example:
a = datetime.datetime(2017, 5, 16, 8, 21, 10)
b = datetime.datetime(2017, 5, 17, 8, 21, 10)
c = datetime.datetime(2017, 5, 18, 8, 21, 10)
is b between a and c? -> yes. Returns True
is c between a and b? -> no. Returns False
returns bool
"""
assert isinstance(query_time, datetime.datetime)
assert isinstance(start_time, datetime.datetime)
assert isinstance(end_time, datetime.datetime)
return start_time <= query_time <= end_time
def time_inbetween_absolute_delta(self, query_time:datetime.datetime, start_time:datetime.datetime, end_time:datetime.datetime) -> tuple:
"""
similarly to self.time_inbetween, this function compares a query_time with the provided start and end time.
however, this function instead returns timedelta objects, which show the difference towards start and end
this function applies abs() to return only absolute deviations. Thereby, -23 becomes +23
returns: tuple(absolute_start_delta, absolute_end_delta)
"""
return (abs(query_time-start_time), abs(query_time-end_time))
def compare_query_is_inbetween_list(self, query_time, list_of_other_times) -> list:
list_of_bools = [
self.time_inbetween(query_time, time_elem_begin, time_elem_end)
for (time_elem_begin, time_elem_end) in list_of_other_times
]
return list_of_bools
def query_time_any_inbetween(self, query_time, list_of_other_times):
"""
given a query_time element, the element will be compared to every element in a list, where each
element is a tuple of (start_time, end_time)
"""
if len(list_of_other_times)==0:
# the time is not inbetween, if the provided list is empty
return False
list_of_bools = self.compare_query_is_inbetween_list(query_time, list_of_other_times)
return np.any(list_of_bools), list_of_bools

View File

@ -0,0 +1,763 @@
import inspect
import types
from BreCal.database.enums import ParticipantType, ShipcallType, ParticipantwiseTimeDelta
import numpy as np
import pandas as pd
from BreCal.validators.time_logic import TimeLogic
from BreCal.database.enums import StatusFlags
#from BreCal.validators.schema_validation import validation_state_and_validation_name
class ValidationRuleBaseFunctions():
"""
Base object with individual functions, which the {ValidationRuleFunctions}-child refers to.
This parent class provides base functions and helps to restructure the code in a more comprehensible way.
"""
def __init__(self, sql_handler):
self.sql_handler = sql_handler
self.time_logic = TimeLogic()
def check_time_delta_violation_query_time_to_now(self, query_time:pd.Timestamp, key_time:pd.Timestamp, threshold:float)->bool:
"""
# base function for all validation rules in the group {0001} A-L
measures the time between NOW and query_time.
When the query_time lays in the past, the delta is negative
when the query_time lays in the future, the delta is positive
returns a violation state depending on whether the delta is
Violation, if: 0 >= delta > threshold
When the key time is defined (not None), there is no violation. Returns False
options:
query_time: will be used to measure the time difference of 'now' until the query time
key_time: will be used to check, whether the respective key already has a value
threshold: threshold where a time difference becomes crucial. When the delta is below the threshold, a violation might occur
"""
# rule is not applicable -> return 'GREEN'
if key_time is not None:
return False
# otherwise, this rule applies and the difference between 'now' and the query time is measured
delta = self.time_logic.time_delta_from_now_to_tgt(tgt_time=query_time, unit="m")
# a violation occurs, when the delta (in minutes) exceeds the specified threshold of a participant
# to prevent past-events from triggering violations, negative values are ignored
# Violation, if 0 >= delta >= threshold
violation_state = (delta >= 0) and (delta<=threshold)
return violation_state
def check_participants_agree_on_estimated_time(self, shipcall, query, df_times, applicable_shipcall_type)->bool:
"""
# base function for all validation rules in the group {0002} A-C
compares, whether the participants agree on the estimated time (of arrival or departure), depending on
whether the shipcall type is incoming, outgoing or shifting.
No violations are observed, when
- the shipcall belongs to a different type than the rule expects
- there are no matching times for the provided {query} (e.g., "eta_berth")
Instead of comparing each individual result, this function counts the amount of unique instances.
When there is not only one unique value, there are deviating time estimates, and a violation occurs
returns: violation_state (bool)
"""
# shipcall type filter: consider only shipcalls, where the type matches
if shipcall.type != applicable_shipcall_type.value:
violation_state = False
return violation_state
# filter by participant types of interest (agency, mooring, portauthority/administration, pilot, tug)
participant_types = [ParticipantType.AGENCY.value, ParticipantType.MOORING.value, ParticipantType.PORT_ADMINISTRATION.value, ParticipantType.PILOT.value, ParticipantType.TUG.value]
df_times = df_times.loc[df_times["participant_type"].isin(participant_types),:]
# exclude missing entries
df_times.loc[~df_times[query].isnull(),:]
# when there are no entries left (no entries are provided), skip
if len(df_times)==0:
violation_state = False
return violation_state
# there should only be one eta_berth, when all participants have provided the same time
# this equates to the same criteria as checking, whether
# times_agency.eta_berth==times_mooring.eta_berth==times_portadministration.eta_berth==times_pilot.eta_berth==times_tug.eta_berth
unique_times = len(pd.unique(df_times.loc[:,query]))
violation_state = unique_times!=1
return violation_state
def check_unique_shipcall_counts(self, query:str, rounding="min", maximum_threshold=3)->bool:
"""
# base function for all validation rules in the group {0005} A&B
compares how many unique times are found for the provided {query} (e.g., "eta_berth")
This function rounds the results, counts the unique values and returns a boolean state, whether the {maximum_threshold} is exceeded
"""
# filter the df: keep only times_agents
# filter out all NaN and NaT entries
times_agency = self.sql_handler.get_times_for_agency(non_null_column=query)
# get values and optionally round the values
(values, unique, counts) = self.sql_handler.get_unique_ship_counts(all_df_times=times_agency, query=query, rounding=rounding, maximum_threshold=maximum_threshold)
# when ANY of the unique values exceeds the threshold, a violation is observed
violation_state = np.any(np.greater(counts, maximum_threshold))
return violation_state
class ValidationRuleFunctions(ValidationRuleBaseFunctions):
"""
an accumulation object that makes sure, that any validation rule is translated to a function with default naming convention and
return types. Each function should return a ValidationRuleState enumeration object and a description string to which validation rule
the result belongs. These are returned as tuples (ValidationRuleState, validation_name)
Each rule should have the same input arguments (self, shipcall, df_times, *args, **kwargs)
The object makes heavy use of calls from an SQLHandler object, which provides functions for dataframe access and filtering.
each validation_name is generated by calling the function inside a method
validation_name = inspect.currentframe().f_code.co_name # validation_name then returns the name of the method from where 'currentframe()' was called.
# example:
#def validation_rule_fct_example(self, shipcall, df_times):
#validation_name = inspect.currentframe().f_code.co_name
#return (ValidationRuleState.NONE, validation_name)
"""
def __init__(self, sql_handler):
super().__init__(sql_handler)
return
def get_validation_rule_functions(self):
"""return a list of all methods in this object, which are all validation rule functions."""
return [self.__getattribute__(mthd_) for mthd_ in dir(self) if ('validation_rule_fct' in mthd_) and (isinstance(self.__getattribute__(mthd_), types.MethodType))]
def validation_rule_fct_missing_time_agency_berth_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-A
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-A:
- Checks, if times_agency.eta_berth is filled in.
- Measures the difference between 'now' and 'shipcall.eta'.
"""
# check, if the header is filled in (agency)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
query_time = shipcall.eta
key_time = times_agency.eta_berth
threshold = ParticipantwiseTimeDelta.AGENCY
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_agency_berth_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-B
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-B:
- Checks, if times_agency.etd_berth is filled in.
- Measures the difference between 'now' and 'shipcall.etd'.
"""
# check, if the header is filled in (agency)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
query_time = shipcall.etd
key_time = times_agency.etd_berth
threshold = ParticipantwiseTimeDelta.AGENCY
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_mooring_berth_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-C
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-C:
- Checks, if times_mooring.eta_berth is filled in.
- Measures the difference between 'now' and 'times_agency.eta_berth'.
"""
# check, if the header is filled in (agency & MOORING)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.MOORING.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_mooring = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.MOORING.value)
query_time = times_agency.eta_berth
key_time = times_mooring.eta_berth
threshold = ParticipantwiseTimeDelta.MOORING
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_mooring_berth_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-D
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-D:
- Checks, if times_mooring.etd_berth is filled in.
- Measures the difference between 'now' and 'times_agency.etd_berth'.
"""
# check, if the header is filled in (agency & MOORING)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.MOORING.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_mooring = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.MOORING.value)
query_time = times_agency.etd_berth
key_time = times_mooring.etd_berth
threshold = ParticipantwiseTimeDelta.MOORING
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_portadministration_berth_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-F
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-F:
- Checks, if times_port_administration.eta_berth is filled in.
- Measures the difference between 'now' and 'times_agency.eta_berth'.
"""
# check, if the header is filled in (agency & PORT_ADMINISTRATION)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PORT_ADMINISTRATION.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_port_administration = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PORT_ADMINISTRATION.value)
query_time = times_agency.eta_berth
key_time = times_port_administration.eta_berth
threshold = ParticipantwiseTimeDelta.PORT_ADMINISTRATION
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_portadministration_berth_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-G
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-G:
- Checks, if times_port_administration.etd_berth is filled in.
- Measures the difference between 'now' and 'times_agency.etd_berth'.
"""
# check, if the header is filled in (agency & PORT_ADMINISTRATION)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PORT_ADMINISTRATION.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_port_administration = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PORT_ADMINISTRATION.value)
query_time = times_agency.etd_berth
key_time = times_port_administration.etd_berth
threshold = ParticipantwiseTimeDelta.PORT_ADMINISTRATION
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_pilot_berth_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-H
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-H:
- Checks, if times_pilot.eta_berth is filled in.
- Measures the difference between 'now' and 'times_agency.eta_berth'.
"""
# check, if the header is filled in (agency & PILOT)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PILOT.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_pilot = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PILOT.value)
query_time = times_agency.eta_berth
key_time = times_pilot.eta_berth
threshold = ParticipantwiseTimeDelta.PILOT
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_pilot_berth_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-I
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-I:
- Checks, if times_pilot.etd_berth is filled in.
- Measures the difference between 'now' and 'times_agency.etd_berth'.
"""
# check, if the header is filled in (agency & PILOT)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PILOT.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_pilot = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PILOT.value)
query_time = times_agency.etd_berth
key_time = times_pilot.etd_berth
threshold = ParticipantwiseTimeDelta.PILOT
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_tug_berth_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-J
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-J:
- Checks, if times_tug.eta_berth is filled in.
- Measures the difference between 'now' and 'times_agency.eta_berth'.
"""
# check, if the header is filled in (agency & TUG)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TUG.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_tug = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TUG.value)
query_time = times_agency.eta_berth
key_time = times_tug.eta_berth
threshold = ParticipantwiseTimeDelta.TUG
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_tug_berth_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-K
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-K:
- Checks, if times_tug.etd_berth is filled in.
- Measures the difference between 'now' and 'times_agency.etd_berth'.
"""
# check, if the header is filled in (agency & TUG)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TUG.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_tug = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TUG.value)
query_time = times_agency.etd_berth
key_time = times_tug.etd_berth
threshold = ParticipantwiseTimeDelta.TUG
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_terminal_berth_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-L
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-L:
- Checks, if times_terminal.eta_berth is filled in.
- Measures the difference between 'now' and 'times_agency.eta_berth'.
"""
# check, if the header is filled in (agency & terminal)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
query_time = times_agency.eta_berth
key_time = times_terminal.eta_berth
threshold = ParticipantwiseTimeDelta.TERMINAL
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_missing_time_terminal_berth_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0001-K
Type: Local Rule
Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
a certain threshold (e.g., 20 hours), a violation occurs
0001-K:
- Checks, if times_terminal.etd_berth is filled in.
- Measures the difference between 'now' and 'times_agency.etd_berth'.
"""
# check, if the header is filled in (agency & terminal)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
return (StatusFlags.GREEN, None)
# preparation: obtain the correct times of the participant, define the query time and the key time
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
query_time = times_agency.etd_berth
key_time = times_terminal.etd_berth
threshold = ParticipantwiseTimeDelta.TERMINAL
violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_shipcall_incoming_participants_disagree_on_eta(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0002-A
Type: Local Rule
Description: this validation checks, whether the participants expect different ETA times
Filter: only applies to incoming shipcalls
"""
query = "eta_berth"
violation_state = self.check_participants_agree_on_estimated_time(
shipcall = shipcall,
query=query,
df_times=df_times,
applicable_shipcall_type=ShipcallType.INCOMING
)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_shipcall_outgoing_participants_disagree_on_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0002-B
Type: Local Rule
Description: this validation checks, whether the participants expect different ETA times
Filter: only applies to outgoing shipcalls
"""
query = "etd_berth"
violation_state = self.check_participants_agree_on_estimated_time(
shipcall = shipcall,
query=query,
df_times=df_times,
applicable_shipcall_type=ShipcallType.OUTGOING
)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_shipcall_shifting_participants_disagree_on_eta_or_etd(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0002-C
Type: Local Rule
Description: this validation checks, whether the participants expect different ETA or ETD times
Filter: only applies to shifting shipcalls
"""
violation_state_eta = self.check_participants_agree_on_estimated_time(
shipcall = shipcall,
query="eta_berth",
df_times=df_times,
applicable_shipcall_type=ShipcallType.SHIFTING
)
violation_state_etd = self.check_participants_agree_on_estimated_time(
shipcall = shipcall,
query="etd_berth",
df_times=df_times,
applicable_shipcall_type=ShipcallType.SHIFTING
)
# apply 'eta_berth' check
# apply 'etd_berth'
# violation: if either 'eta_berth' or 'etd_berth' is violated
# functionally, this is the same as individually comparing all times for the participants
# times_agency.eta_berth==times_mooring.eta_berth==times_portadministration.eta_berth==times_pilot.eta_berth==times_tug.eta_berth
# times_agency.etd_berth==times_mooring.etd_berth==times_portadministration.etd_berth==times_pilot.etd_berth==times_tug.etd_berth
violation_state = (violation_state_eta) or (violation_state_etd)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_eta_time_not_in_operation_window(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0003-A
Type: Local Rule
Description: this validation checks, whether the ETA time is between the provided operations window of the terminal
query time: eta_berth (times_agency)
start_time & end_time: operations_start & operations_end (times_terminal)
"""
# check, if the header is filled in (agency & terminal)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
return (StatusFlags.GREEN, None)
# get agency & terminal times
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
if (times_terminal.operations_end is pd.NaT) or (times_agency.etd_berth is pd.NaT):
return (StatusFlags.GREEN, None)
# check, whether the start of operations is AFTER the estimated arrival time
violation_state = times_terminal.operations_start<times_agency.eta_berth
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_eta_time_not_in_operation_window(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0003-B
Type: Local Rule
Description: this validation checks, whether the ETD time is between the provided operations window of the terminal
query time: eta_berth (times_agency)
start_time & end_time: operations_start & operations_end (times_terminal)
"""
# check, if the header is filled in (agency & terminal)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
return (StatusFlags.GREEN, None)
# get agency & terminal times
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
if (times_terminal.operations_end is pd.NaT) or (times_agency.etd_berth is pd.NaT):
return (StatusFlags.GREEN, None)
# check, whether the end of operations is AFTER the estimated departure time
violation_state = times_terminal.operations_end > times_agency.etd_berth
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_eta_time_not_in_tidal_window(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0004-A
Type: Local Rule
Description: this validation checks, whether the ETA time is between the provided tidal window
query time: eta_berth (times_agency)
start_time & end_time: tidal_window_from & tidal_window_to (shipcall)
"""
# check, if the header is filled in (agency)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
return (StatusFlags.GREEN, None)
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
# requirements: tidal window (from & to) is filled in
if (shipcall.tidal_window_from is pd.NaT) or (shipcall.tidal_window_to is pd.NaT) or (df_times.eta_berth is pd.NaT):
return (StatusFlags.GREEN, None)
# check, whether the query time is between start & end time
# a violation is observed, when the is NOT between start & end
violation_state = not self.time_logic.time_inbetween(query_time=times_agency.eta_berth, start_time=shipcall.tidal_window_from, end_time=shipcall.tidal_window_to)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_etd_time_not_in_tidal_window(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0004-B
Type: Local Rule
Description: this validation checks, whether the ETD time is between the provided tidal window
query time: eta_berth (times_agency)
start_time & end_time: tidal_window_from & tidal_window_to (shipcall)
"""
# check, if the header is filled in (agency)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
return (StatusFlags.GREEN, None)
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
# requirements: tidal window (from & to) is filled in
if (shipcall.tidal_window_from is pd.NaT) or (shipcall.tidal_window_to is pd.NaT) or (df_times.eta_berth is pd.NaT):
return (StatusFlags.GREEN, None)
# check, whether the query time is between start & end time
# a violation is observed, when the is NOT between start & end
violation_state = not self.time_logic.time_inbetween(query_time=times_agency.etd_berth, start_time=shipcall.tidal_window_from, end_time=shipcall.tidal_window_to)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.RED, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_too_many_identical_eta_times(self, shipcall, df_times, rounding = "min", maximum_threshold = 3, *args, **kwargs):
"""
Code: #0005-A
Type: Global Rule
Description: this validation rule checks, whether there are too many shipcalls with identical ETA times.
"""
# when ANY of the unique values exceeds the threshold, a violation is observed
query = "eta_berth"
violation_state = self.check_unique_shipcall_counts(query, rounding=rounding, maximum_threshold=maximum_threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_too_many_identical_etd_times(self, shipcall, df_times, rounding = "min", maximum_threshold = 3, *args, **kwargs):
"""
Code: #0005-B
Type: Global Rule
Description: this validation rule checks, whether there are too many shipcalls with identical ETD times.
"""
# when ANY of the unique values exceeds the threshold, a violation is observed
query = "etd_berth"
violation_state = self.check_unique_shipcall_counts(query, rounding=rounding, maximum_threshold=maximum_threshold)
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_agency_and_terminal_berth_id_disagreement(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0006-A
Type: Local Rule
Description: This validation rule checks, whether agency and terminal agree with their designated berth place by checking berth_id.
"""
# check, if the header is filled in (agency & terminal)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
return (StatusFlags.GREEN, None)
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
violation_state = times_agency.berth_id!=times_terminal.berth_id
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)
def validation_rule_fct_agency_and_terminal_pier_side_disagreement(self, shipcall, df_times, *args, **kwargs):
"""
Code: #0006-B
Type: Local Rule
Description: This validation rule checks, whether agency and terminal agree with their designated pier side by checking pier_side.
"""
# check, if the header is filled in (agency & terminal)
if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
return (StatusFlags.GREEN, None)
times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
violation_state = times_agency.pier_side!=times_terminal.pier_side
if violation_state:
validation_name = inspect.currentframe().f_code.co_name
return (StatusFlags.YELLOW, validation_name)
else:
return (StatusFlags.GREEN, None)

View File

@ -0,0 +1,128 @@
import copy
import numpy as np
import pandas as pd
from BreCal.database.enums import StatusFlags
from BreCal.validators.validation_rule_functions import ValidationRuleFunctions
from BreCal.schemas.model import Shipcall
class ValidationRules(ValidationRuleFunctions):
"""
An object that determines the traffic light state for validation and notification. The provided feedback ('green', 'yellow', 'red')
determines, whether the state is critical. It uses ValidationRuleState enumerations.
In case of a critical validation state, the user's input prompt may be interrupted and the user may be warned.
In case of a critical notification state, the respective users will be automatically notified after n seconds. (#TODO_n_seconds_delay)
"""
def __init__(self, sql_handler): # use the entire data that is provided for this query (e.g., json input)
super().__init__(sql_handler)
self.validation_state = self.determine_validation_state()
# currently flagged: notification_state initially was based on using one ValidationRules object for each query. This is deprecated.
# self.notification_state = self.determine_notification_state() # (state:str, should_notify:bool)
return
def evaluate(self, shipcall):
"""
1.) prepare df_times, which every validation rule tends to use
calling this only once saves a lot of computational overhead
2.) apply all validation rules
returns: (evaluation_state, violations)
"""
# prepare df_times, which every validation rule tends to use
df_times = self.sql_handler.df_dict.get('times') # -> pd.DataFrame
# filter by shipcall id
df_times = self.sql_handler.get_times_of_shipcall(shipcall)
# apply all validation rules
# list of tuples, where each element is (state, msg)
evaluation_results = [elem(shipcall, df_times) for elem in self.get_validation_rule_functions()]
# filter out all 'None' results, which indicate that no violation occured.
evaluation_results = [evaluation_result for evaluation_result in evaluation_results if evaluation_result[1] is not None]
""" # deprecated
# check, if ANY of the evaluation results (evaluation_state) is larger than the .GREEN state. This means, that .YELLOW and .RED
# would return 'True'. Numpy arrays and functions are used to accelerate the comparison.
# np.any returns a boolean.
#evaluation_state = not np.any(np.greater(np.array([result[0] for result in evaluation_results]), ValidationRuleState.GREEN))
"""
# check, what the maximum state flag is and return it
evaluation_state = np.max(np.array([result[0].value for result in evaluation_results])) if len(evaluation_results)>0 else 1
evaluation_verbosity = [result[1] for result in evaluation_results]
return (evaluation_state, evaluation_verbosity)
def evaluation_verbosity(self, evaluation_state, evaluation_results):
"""This function suggestions verbosity for the evaluation results. Based on 'True'/'False' evaluation outcome, the returned string is different."""
if evaluation_state:
return f"OK! The validation was successful. There are no rule violations."
else:
verbose_string = "These are:" + "\n\t".join(evaluation_results) # every element of the list will be displayed in a new line with a tab
return f"FAILED VALIDATION. There have been {len(evaluation_results)} violations. {verbose_string}"
def evaluate_shipcall_from_df(self, x):
shipcall = Shipcall(**{**{'id':x.name}, **x.to_dict()})
evaluation_state, violations = self.evaluate(shipcall)
return evaluation_state, violations
def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame:
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation' and 'evaluation_message' are updated)"""
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values
# unbundle individual results. evaluation_state becomes an integer, violation
evaluation_state = [StatusFlags(res[0]).value for res in results]
violations = [",".join(res[1]) if len(res[1])>0 else None for res in results]
shipcall_df.loc[:,"evaluation"] = evaluation_state
shipcall_df.loc[:,"evaluation_message"] = violations
return shipcall_df
def determine_validation_state(self) -> str:
"""
this method determines the validation state of a shipcall. The state is either ['green', 'yellow', 'red'] and signals,
whether an entry causes issues within the workflow of users.
returns: validation_state_new (str)
"""
(validation_state_new, description) = self.undefined_method()
# should there also be notifications for critical validation states? In principle, the traffic light itself provides that notification.
self.validation_state = validation_state_new
return validation_state_new
def determine_notification_state(self) -> (str, bool):
"""
this method determines state changes in the notification state. When the state is changed to yellow or red,
a user is notified about it. The only exception for this rule is when the state was yellow or red before,
as the user has then already been notified.
returns: notification_state_new (str), should_notify (bool)
"""
(state_new, description) = self.undefined_method() # determine the successor
should_notify = self.identify_notification_state_change(state_new)
self.notification_state = state_new # overwrite the predecessor
return state_new, should_notify
def identify_notification_state_change(self, state_new) -> bool:
"""
determines, whether the observed state change should trigger a notification.
internally, this function maps a color string to an integer and determines, if the successor state is more severe than the predecessor.
state changes trigger a notification in the following cases:
green -> yellow
green -> red
yellow -> red
(none -> yellow) or (none -> red)
due to the values in the enumeration objects, the states are mapped to provide this function.
green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
returns bool, whether a notification should be triggered
"""
# state_old is always considered at least 'Green' (1)
state_old = max(copy.copy(self.notification_state) if "notification_state" in list(self.__dict__.keys()) else StatusFlags.NONE, StatusFlags.GREEN.value)
return state_new.value > state_old.value
def undefined_method(self) -> str:
"""this function should apply the ValidationRules to the respective .shipcall, in regards to .times"""
# #TODO_traffic_state
return (StatusFlags.GREEN, False) # (state:str, should_notify:bool)

View File

@ -3,11 +3,21 @@ Flask==1.1.2
itsdangerous==1.1.0
Jinja2==2.11.2
MarkupSafe==1.1.1
marshmallow==3.9.1
marshmallow>=3.9.1
webargs==6.1.1
Werkzeug==1.0.1
pydapper[mysql-connector-python]
marshmallow-dataclass
bcrypt
jwt
flask-jwt-extended
pyjwt
flask-jwt-extended
numpy
pandas
tqdm
schedule
pytest
pytest-cov
coverage
../server/.

View File

View File

View File

@ -0,0 +1,52 @@
import unittest
import pytest
def test_execute_coverage_test():
"""
executes {execute_coverage_test} to check, whether reporting works as expected
"""
import os
import BreCal.brecal_utils
from BreCal.brecal_utils.file_handling import get_project_root
from BreCal.brecal_utils.test_handling import execute_coverage_test
# find the root folder 'server'
root_dir = BreCal.brecal_utils.__file__
root_dir = get_project_root("server", root_dir=root_dir)
# find the test path, the library path and the coverage report path
tests_path = os.path.join(root_dir, "tests")
coverage_path = os.path.join(root_dir, "BreCal")
report_path = os.path.join(root_dir, "coverage_reports")
with pytest.raises(KeyboardInterrupt, match="is_test_interrupt"):
execute_coverage_test(tests_path=tests_path, coverage_path=coverage_path, cov_report_dst_dir=report_path, cov_fail_under_rate=0, is_test=1)
return
def test_execute_coverage_test_no_report():
"""
executes {execute_coverage_test} to check, whether the function also works without reporting
"""
import os
import BreCal.brecal_utils
from BreCal.brecal_utils.file_handling import get_project_root
from BreCal.brecal_utils.test_handling import execute_coverage_test
# find the root folder 'server'
root_dir = BreCal.brecal_utils.__file__
root_dir = get_project_root("server", root_dir=root_dir)
# find the test path, the library path and the coverage report path
tests_path = os.path.join(root_dir, "tests")
coverage_path = os.path.join(root_dir, "BreCal")
report_path = os.path.join(root_dir, "coverage_reports")
with pytest.raises(KeyboardInterrupt, match="is_test_interrupt"):
execute_coverage_test(tests_path=tests_path, coverage_path=coverage_path, cov_report_dst_dir=None, cov_fail_under_rate=0, is_test=1)
return
if __name__=="__main__":
pass

View File

@ -0,0 +1,52 @@
import pytest
def test_difference_to_then_tgt_time_none():
import math
import datetime
from BreCal import difference_to_then
difference_in_seconds = 42
event_time = datetime.datetime.now() - datetime.timedelta(seconds=difference_in_seconds)
event_time_diff = difference_to_then(event_time) # tgt_time = datetime.datetime.now()
# {difference_to_then} internally creates a .now() time, when the {then_time} is not defined
# hence, the difference will never be exactly 42 seconds due to slight latency
# math.isclose allows deviations up to 0.05 seconds
assert math.isclose(42, event_time_diff, abs_tol=0.05), f"both times are reasonably close"
return
def test_difference_to_then_tgt_time_not_none():
import math
import datetime
from BreCal import difference_to_then
difference_in_seconds = 42
event_time = datetime.datetime(2000, 1, 1, 0, 0, 0)
tgt_time = event_time - datetime.timedelta(seconds=difference_in_seconds)
event_time_diff = difference_to_then(event_time, tgt_time)
# tgt time is -42 seconds, as it is 42 seconds before event_time
assert event_time_diff==-42, f"event time difference is incorrect"
return
def test_difference_to_then_tgt_time_not_none_make_absolute():
import math
import datetime
from BreCal import difference_to_then
difference_in_seconds = 42
event_time = datetime.datetime(2000, 1, 1, 0, 0, 0)
tgt_time = event_time - datetime.timedelta(seconds=difference_in_seconds)
event_time_diff = difference_to_then(event_time, tgt_time, make_absolute=True) # difference: -42. make_absolute: +42
# tgt time is -42 seconds, as it is 42 seconds before event_time. However, we are interested in an absolute value
assert event_time_diff==42, f"event time difference is incorrect"
return
if __name__=="__main__":
test_difference_to_then_tgt_time_none()
test_difference_to_then_tgt_time_not_none()
test_difference_to_then_tgt_time_not_none_make_absolute()

View File

View File

View File

View File

View File

@ -0,0 +1,59 @@
import pytest
def test_build_stub_berth():
from BreCal.schemas.model import Berth
from BreCal.stubs.berth import get_berth_simple
berth = get_berth_simple()
assert isinstance(berth, Berth)
return
def test_build_stub_participant():
from BreCal.schemas.model import Participant
from BreCal.stubs.participant import get_participant_simple
participant = get_participant_simple()
assert isinstance(participant, Participant)
return
def test_build_stub_user():
from BreCal.schemas.model import User
from BreCal.stubs.user import get_user_simple
user = get_user_simple()
assert isinstance(user, User)
return
def test_build_stub_ship():
from BreCal.schemas.model import Ship
from BreCal.stubs.ship import get_ship_simple
ship = get_ship_simple()
assert isinstance(ship, Ship)
return
def test_build_stub_shipcall():
from BreCal.schemas.model import Shipcall
from BreCal.stubs.shipcall import get_shipcall_simple
shipcall = get_shipcall_simple()
assert isinstance(shipcall, Shipcall)
return
def test_build_stub_times():
from BreCal.schemas.model import Times
from BreCal.stubs.times_full import get_times_full_simple
times = get_times_full_simple()
assert isinstance(times, Times)
return
def test_build_stub_notification():
from BreCal.schemas.model import Notification
from BreCal.stubs.notification import get_notification_simple
notification = get_notification_simple()
assert isinstance(notification, Notification)
if __name__=="__main__":
test_build_stub_berth()
test_build_stub_participant()
test_build_stub_berth()
test_build_stub_user()
test_build_stub_ship()
test_build_stub_shipcall()
test_build_stub_times()
test_build_stub_notification()

View File

@ -0,0 +1,21 @@
import pytest
def test_create_app():
"""
"""
import os
import sys
from BreCal import get_project_root
project_root = get_project_root("brecal")
lib_location = os.path.join(project_root, "src", "server")
sys.path.append(lib_location)
from BreCal import create_app
os.chdir(project_root) # set the current directory to ~/brecal, so the config is found
application = create_app()
return
if __name__=="__main__":
test_create_app()

View File

@ -0,0 +1,86 @@
import pytest
def test_import_tqdm_tqdm():
"""tqdm is a neat utility library for simple display of progress in loops"""
from tqdm import tqdm
return
def test_import_numpy():
"""numpy is useful to evaluate multiple entries simultaneously, as boolean operations (e.g., greater than) are efficiently handled"""
import numpy as np
return
def test_import_pandas():
"""pandas is useful to handle dataframes and read from .csv or .json files, which can be collected into joint DataFrame objects"""
import pandas as pd
return
def test_import_flask():
"""flask is a WSGI framework for quick and easy design of web-based applications"""
import flask
from flask import Flask, Blueprint, request
return
def test_import_flask_specific_objects():
"""common flask objects, such as the Flask api object, the Blueprint and requests"""
from flask import Flask, Blueprint, request
return
def test_import_mysql_connector():
"""the 'mysql.connector' Object is used for the BreCal server database"""
import mysql.connector
return
def test_import_pydapper():
"""is a library that provides convenient methods for database related work"""
import pydapper
return
def test_import_webargs():
"""currently used in ~/brecal/src/server/BreCal/api/berths.py"""
import webargs
from webargs.flaskparser import parser
return
def test_import_mashmallow():
"""currently used in ~/brecal/src/server/BreCal/api/shipcalls.py"""
import marshmallow
from marshmallow import Schema, fields
return
def test_import_flask_jwt_extended():
"""currently used in ~/brecal/src/server/BreCal/api/login.py"""
import flask_jwt_extended
from flask_jwt_extended import create_access_token
return
def test_import_pyjwt():
"""currently used in ~/brecal/src/server/BreCal/services/jwt_handler.py"""
import jwt
return
def test_import_bcrypt():
"""currently used in ~/brecal/src/server/BreCal/impl/login.py"""
import bcrypt
return
def test_import_math():
"""math.isclose can be interesting to measure differences between two times (e.g., to ignore milliseconds)"""
import math
math.isclose
return
def test_import_datetime():
"""datetime is the default library for times"""
import datetime
datetime.datetime.now()
return
if __name__=="__main__":
test_import_tqdm_tqdm()
test_import_pandas()
test_import_flask()

View File

View File

@ -0,0 +1,63 @@
import pytest
@pytest.fixture()
def build_input_validation():
from BreCal.validators.input_validation import InputValidation
iv = InputValidation()
return locals()
def test_build_input_validation():
from BreCal.validators.input_validation import InputValidation
iv = InputValidation()
return
def test_all_models_are_supported(build_input_validation):
iv = build_input_validation["iv"]
from BreCal.stubs.ship import get_ship_simple
ship = get_ship_simple()
iv.assert_if_not_supported(ship)
from BreCal.stubs.shipcall import get_shipcall_simple
shipcall = get_shipcall_simple()
iv.assert_if_not_supported(shipcall)
from BreCal.stubs.berth import get_berth_simple
berth = get_berth_simple()
iv.assert_if_not_supported(berth)
from BreCal.stubs.participant import get_participant_simple
participant = get_participant_simple()
iv.assert_if_not_supported(participant)
from BreCal.stubs.user import get_user_simple
user = get_user_simple()
iv.assert_if_not_supported(user)
# placeholder: how to handle times?
return
def test_ship_input_validation(build_input_validation):
iv = build_input_validation["iv"]
from BreCal.stubs.ship import get_ship_simple
ship = get_ship_simple()
violations, state = iv.verify(ship)
assert state, f"found violations: {violations}"
return
def test_participant_input_validation(build_input_validation):
iv = build_input_validation["iv"]
from BreCal.stubs.participant import get_participant_simple
participant = get_participant_simple()
violations, state = iv.verify(participant)
assert state, f"found violations: {violations}"
return
if __name__=="__main__":
pass

View File

@ -0,0 +1,14 @@
import pytest
from BreCal.stubs.berth import get_berth_simple
def test_berth():
with pytest.raises(ValueError, match="#TODO: copied from ships."):
berth = get_berth_simple()
raise ValueError("#TODO: copied from ships.")
from BreCal.validators.schema_validation import test____
ship = get_ship_simple()
ship.length = 234
assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters"
return

View File

@ -0,0 +1,28 @@
import pytest
from BreCal.stubs.participant import get_participant_simple
def test_participant_postal_code_len_is_five():
from BreCal.validators.schema_validation import participant_postal_code_len_is_five
participant = get_participant_simple()
assert participant_postal_code_len_is_five(participant)[0], f"the postal code should be exactly 5 numbers"
return
def test_participant_postal_code_len_is_six_should_assert():
from BreCal.validators.schema_validation import participant_postal_code_len_is_five
participant = get_participant_simple()
participant.postal_code = "123456"
with pytest.raises(AssertionError, match="the postal code should be exactly 5 numbers"):
assert participant_postal_code_len_is_five(participant)[0], f"the postal code should be exactly 5 numbers"
return
# TODO_postal_code_zero -> assert? Is postal_code mandatory?
if __name__=="__main__":
test_participant_postal_code_len_is_five()
test_participant_postal_code_len_is_six_should_assert()

View File

@ -0,0 +1,270 @@
import pytest
from BreCal.stubs.ship import get_ship_simple
def test_ship_length_valid_range_234_is_valid():
from BreCal.validators.schema_validation import ship_length_in_range
ship = get_ship_simple()
ship.length = 234
assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters"
return
def test_ship_length_maximum_not_valid_range():
from BreCal.validators.schema_validation import ship_length_in_range
ship = get_ship_simple()
ship.length = 500
with pytest.raises(AssertionError):
assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters, but is 500"
return
def test_ship_length_minimum_not_valid_range():
from BreCal.validators.schema_validation import ship_length_in_range
ship = get_ship_simple()
ship.length = 0
with pytest.raises(AssertionError):
assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters, but is 0"
return
def test_ship_width_valid_range_137_is_valid():
from BreCal.validators.schema_validation import ship_width_in_range
ship = get_ship_simple()
ship.width = 137
assert ship_width_in_range(ship)[0], f"ship width must be between 0 and 500 meters"
return
def test_ship_width_maximum_not_valid_range():
from BreCal.validators.schema_validation import ship_width_in_range
ship = get_ship_simple()
ship.width = 500
with pytest.raises(AssertionError):
assert ship_width_in_range(ship)[0], f"ship width must be between 0 and 500 meters, but is 500"
return
def test_ship_width_minimum_not_valid_range():
from BreCal.validators.schema_validation import ship_width_in_range
ship = get_ship_simple()
ship.width = 0
with pytest.raises(AssertionError):
assert ship_width_in_range(ship)[0], f"ship width must be between 0 and 500 meters, but is 0"
return
# not tug: values can be None and raise no error
def test_ship_bollard_pull_is_none_and_not_tug():
from BreCal.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = False
ship.bollard_pull = None
assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull should either be undefined or between 0 and 500 meters"
return
def test_ship_participant_id_is_none_and_not_tug():
from BreCal.validators.schema_validation import ship_participant_id_is_none_or_int
ship = get_ship_simple()
ship.is_tug = False
ship.participant_id = None
assert ship_participant_id_is_none_or_int(ship)[0], f"the participant_id should either be undefined or an integer id"
return
def test_ship_max_draft_is_none_and_not_tug():
from BreCal.validators.schema_validation import ship_max_draft_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = False
ship.max_draft = None
assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft should either be undefined or between 0 and 20 meters"
return
# tug: values must be set, and are set. all tests should be accepted without assertion
def test_ship_is_tug_bollard_pull_is_not_none():
from BreCal.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.bollard_pull = 311
assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull should either be undefined or between 0 and 500 meters"
return
def test_ship_is_tug_max_draft_is_not_none():
from BreCal.validators.schema_validation import ship_max_draft_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.max_draft = 17
assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft should either be undefined or between 0 and 20 meters"
return
def test_ship_is_tug_participant_id_is_not_none():
from BreCal.validators.schema_validation import ship_participant_id_is_none_or_int
from BreCal.stubs import generate_uuid1_int
ship = get_ship_simple()
ship.is_tug = True
ship.participant_id = generate_uuid1_int()
assert ship_participant_id_is_none_or_int(ship)[0], f"the participant_id should either be undefined or an integer id"
return
def test_ship_is_tug_participant_id_is_str_and_fails():
from BreCal.validators.schema_validation import ship_participant_id_is_none_or_int
# note: this is an artificial test case. However, it ensures that operators using the backend cannot create an id incorrectly
from BreCal.stubs import generate_uuid1_int
ship = get_ship_simple()
ship.is_tug = True
ship.participant_id = str(generate_uuid1_int())
with pytest.raises(AssertionError):
assert ship_participant_id_is_none_or_int(ship)[0], f"the participant_id should either be None or int, but is str"
return
# tug: values must be set, but are not. all tests should raise AssertionError
def test_ship_is_tug_bollard_pull_but_is_none_fails():
from BreCal.validators.schema_validation import ship_bollard_pull_is_defined_or_is_not_tug
ship = get_ship_simple()
ship.is_tug = True
ship.bollard_pull = None
with pytest.raises(AssertionError):
assert ship_bollard_pull_is_defined_or_is_not_tug(ship)[0], f"the bollard_pull cannot be None, if the ship is a tug"
return
def test_ship_is_tug_max_draft_but_is_none_fails():
from BreCal.validators.schema_validation import ship_max_draft_is_defined_or_is_not_tug
ship = get_ship_simple()
ship.is_tug = True
ship.max_draft = None
with pytest.raises(AssertionError):
assert ship_max_draft_is_defined_or_is_not_tug(ship)[0], f"the max_draft cannot be None, if the ship is a tug"
return
def test_ship_is_tug_participant_id_but_is_none_fails():
from BreCal.validators.schema_validation import ship_participant_id_is_defined_or_is_not_tug
ship = get_ship_simple()
ship.is_tug = True
ship.participant_id = None
with pytest.raises(AssertionError):
assert ship_participant_id_is_defined_or_is_not_tug(ship)[0], f"the participant_id cannot be None, if the ship is a tug"
return
# tug: values must be in valid range
# # sequence: 1.) is valid, 2.) is too small, 3.) is too large
def test_ship_is_tug_bollard_pull_in_range_311_valid():
from BreCal.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.bollard_pull = 311
assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull must be 0<value<500"
return
def test_ship_is_tug_bollard_pull_in_range_minimum_not_valid():
from BreCal.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.bollard_pull = 0
with pytest.raises(AssertionError):
assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull must be 0<value<500, but is 0"
return
def test_ship_is_tug_bollard_pull_in_range_maximum_not_valid():
from BreCal.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.bollard_pull = 500
with pytest.raises(AssertionError):
assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull must be 0<value<500, but is 500"
return
def test_ship_is_tug_max_draft_in_range_11_valid():
from BreCal.validators.schema_validation import ship_max_draft_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.max_draft = 11
assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft must be 0<value<20"
return
def test_ship_is_tug_max_draft_in_range_minimum_not_valid():
from BreCal.validators.schema_validation import ship_max_draft_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.max_draft = 0
with pytest.raises(AssertionError):
assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft must be 0<value<20, but is 0"
return
def test_ship_is_tug_max_draft_in_range_maximum_not_valid():
from BreCal.validators.schema_validation import ship_max_draft_is_none_or_in_range
ship = get_ship_simple()
ship.is_tug = True
ship.max_draft = 20
with pytest.raises(AssertionError):
assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft must be 0<value<20, but is 20"
return
# Length tests
def test_ship_eni_len_is_eight_and_passes():
from BreCal.validators.schema_validation import ship_eni_len_is_eight
ship = get_ship_simple()
ship.eni = "01234567" # 8 character example
assert ship_eni_len_is_eight(ship)[0], f"the eni-no. should have exactly 8 characters"
return
def test_ship_eni_len_is_eight_but_has_nine():
from BreCal.validators.schema_validation import ship_eni_len_is_eight
ship = get_ship_simple()
ship.eni = "012345678" # 9 character example
with pytest.raises(AssertionError):
assert ship_eni_len_is_eight(ship)[0], f"the eni-no. should have exactly 8 characters, but has 9 characters"
return
def test_ship_callsign_len_is_seven_at_maximum_seven_passes():
from BreCal.validators.schema_validation import ship_callsign_len_is_seven_at_maximum
ship = get_ship_simple()
ship.callsign = "0123456" # 7 character example
assert ship_callsign_len_is_seven_at_maximum(ship)[0], f"the callsign no. should have at maximum 7 characters"
return
def test_ship_callsign_len_is_seven_at_maximum_six_passes():
from BreCal.validators.schema_validation import ship_callsign_len_is_seven_at_maximum
ship = get_ship_simple()
ship.callsign = "012345" # 6 character example
assert ship_callsign_len_is_seven_at_maximum(ship)[0], f"the callsign no. should have at maximum 7 characters"
return
def test_ship_callsign_len_is_seven_at_maximum_eight_fails():
from BreCal.validators.schema_validation import ship_callsign_len_is_seven_at_maximum
ship = get_ship_simple()
ship.callsign = "01234567" # 8 character example
with pytest.raises(AssertionError):
assert ship_callsign_len_is_seven_at_maximum(ship)[0], f"the callsign no. should have at maximum 7 characters, but has 8"
return
def test_ship_callsign_len_is_seven_at_maximum_zero_passes():
from BreCal.validators.schema_validation import ship_callsign_len_is_seven_at_maximum
ship = get_ship_simple()
ship.callsign = "" # 0 character example
assert ship_callsign_len_is_seven_at_maximum(ship)[0], f"the callsign no. should have at maximum 7 characters"
return
def test_imo_len_is_seven_and_seven_passes():
from BreCal.validators.schema_validation import ship_imo_len_is_seven
ship = get_ship_simple()
ship.imo = 1234567 # integer required
assert ship_imo_len_is_seven(ship)[0], f"a ship's IMO no. should have exactly 7 characters"
return
def test_imo_len_is_seven_and_eight_fails():
from BreCal.validators.schema_validation import ship_imo_len_is_seven
ship = get_ship_simple()
ship.imo = 12345678 # integer required
with pytest.raises(AssertionError):
assert ship_imo_len_is_seven(ship)[0], f"a ship's IMO no. should have exactly 7 characters, but it has 8 character"
return
def test_imo_len_is_seven_and_one_fails():
from BreCal.validators.schema_validation import ship_imo_len_is_seven
ship = get_ship_simple()
ship.imo = 1 # integer required
with pytest.raises(AssertionError):
assert ship_imo_len_is_seven(ship)[0], f"a ship's IMO no. should have exactly 7 characters, but is has 1 character"
return
if __name__=="__main__":
pass

View File

@ -0,0 +1,159 @@
import pytest
from BreCal.validators.validation_rule_functions import ValidationRuleFunctions
from BreCal.validators.validation_rules import ValidationRules
from BreCal.database.sql_handler import SQLHandler
@pytest.fixture(scope="session")
def build_sql_proxy_connection():
import mysql.connector
conn_from_pool = mysql.connector.connect(**{'host':'localhost', 'port':3306, 'user':'root', 'password':'HalloWach_2323XXL!!', 'pool_name':'brecal_pool', 'pool_size':20, 'database':'bremen_calling', 'autocommit': True})
sql_handler = SQLHandler(sql_connection=conn_from_pool, read_all=True)
vr = ValidationRules(sql_handler)
return locals()
def test_build_validation_rule_functions(build_sql_proxy_connection):
import types
sql_handler = build_sql_proxy_connection["sql_handler"]
vr = build_sql_proxy_connection["vr"]
validation_rule_functions = vr.get_validation_rule_functions()
assert isinstance(validation_rule_functions, list), f"must return a list of methods"
for vrule in validation_rule_functions:
assert isinstance(vrule,types.MethodType), f"every element returned from get_validation_rule_functions must be a method. found: {type(vrule)}"
assert len(validation_rule_functions)>0, f"must return at least one method!"
return
def test_validation_rule_fct_agency_and_terminal_pier_side_disagreement(build_sql_proxy_connection):
"""#0006-A validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
import pandas as pd
from BreCal.stubs.times_full import get_times_full_simple
from BreCal.stubs.shipcall import get_shipcall_simple
from BreCal.database.enums import ParticipantType
from BreCal.database.enums import StatusFlags
vr = build_sql_proxy_connection["vr"]
shipcall = get_shipcall_simple()
t1 = get_times_full_simple()
t2 = get_times_full_simple()
# roles: agency & terminal
t1.participant_type = ParticipantType.AGENCY.value
t2.participant_type = ParticipantType.TERMINAL.value
# disagreement
t1.pier_side = True
t2.pier_side = False
time_objects = [t1, t2]
df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
df_times.set_index('id',inplace=True)
(state, description) = vr.validation_rule_fct_agency_and_terminal_pier_side_disagreement(shipcall, df_times)
assert state.value > StatusFlags.GREEN.value, f"a violation must be identified"
assert description is not None, f"a violation description must be identified"
return
def test_validation_rule_fct_agency_and_terminal_pier_side_agreement(build_sql_proxy_connection):
"""#0006-A validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
import pandas as pd
from BreCal.stubs.times_full import get_times_full_simple
from BreCal.stubs.shipcall import get_shipcall_simple
from BreCal.database.enums import ParticipantType
from BreCal.database.enums import StatusFlags
vr = build_sql_proxy_connection["vr"]
shipcall = get_shipcall_simple()
t1 = get_times_full_simple()
t2 = get_times_full_simple()
# roles: agency & terminal
t1.participant_type = ParticipantType.AGENCY.value
t2.participant_type = ParticipantType.TERMINAL.value
# agreement
t1.pier_side = True
t2.pier_side = True
time_objects = [t1, t2]
df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
df_times.set_index('id',inplace=True)
(state, description) = vr.validation_rule_fct_agency_and_terminal_pier_side_disagreement(shipcall, df_times)
assert state.value == StatusFlags.GREEN.value, f"no violation should be observed"
assert description is None, f"no violation should be observed"
return
def test_validation_rule_fct_agency_and_terminal_berth_id_disagreement(build_sql_proxy_connection):
"""#0006-B validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
import pandas as pd
from BreCal.stubs.times_full import get_times_full_simple
from BreCal.stubs.shipcall import get_shipcall_simple
from BreCal.database.enums import ParticipantType
from BreCal.database.enums import StatusFlags
vr = build_sql_proxy_connection["vr"]
shipcall = get_shipcall_simple()
t1 = get_times_full_simple()
t2 = get_times_full_simple()
# roles: agency & terminal
t1.participant_type = ParticipantType.AGENCY.value
t2.participant_type = ParticipantType.TERMINAL.value
# disagreement
t1.berth_id = 1
t2.berth_id = 2
time_objects = [t1, t2]
df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
df_times.set_index('id',inplace=True)
(state, description) = vr.validation_rule_fct_agency_and_terminal_berth_id_disagreement(shipcall, df_times)
assert state.value > StatusFlags.GREEN.value, f"a violation must be identified"
assert description is not None, f"a violation description must be identified"
return
def test_validation_rule_fct_agency_and_terminal_berth_id_agreement(build_sql_proxy_connection):
"""#0006-B validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
import pandas as pd
from BreCal.stubs.times_full import get_times_full_simple
from BreCal.stubs.shipcall import get_shipcall_simple
from BreCal.database.enums import ParticipantType
from BreCal.database.enums import StatusFlags
vr = build_sql_proxy_connection["vr"]
shipcall = get_shipcall_simple()
t1 = get_times_full_simple()
t2 = get_times_full_simple()
# roles: agency & terminal
t1.participant_type = ParticipantType.AGENCY.value
t2.participant_type = ParticipantType.TERMINAL.value
# agreement
t1.berth_id = 21
t2.berth_id = 21
time_objects = [t1, t2]
df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
df_times.set_index('id',inplace=True)
(state, description) = vr.validation_rule_fct_agency_and_terminal_berth_id_disagreement(shipcall, df_times)
assert state.value == StatusFlags.GREEN.value, f"no violation should be observed"
assert description is None, f"no violation should be observed"
return

View File

@ -0,0 +1,26 @@
import pytest
from BreCal.database.enums import StatusFlags
def test_validation_rule_state_green_is_1():
assert StatusFlags.GREEN.value==1
return
def test_validation_rule_state_yellow_is_2():
assert StatusFlags.YELLOW.value==2
return
def test_validation_rule_state_red_is_3():
assert StatusFlags.RED.value==3
return
def test_validation_rule_state_order():
# Red 3, Yellow 2, Green 1, None 0
# red>yellow>green>none
assert StatusFlags.RED.value>StatusFlags.YELLOW.value
assert StatusFlags.YELLOW.value>StatusFlags.GREEN.value
assert StatusFlags.GREEN.value>StatusFlags.NONE.value
return
if __name__=="__main__":
pass