diff --git a/.gitignore b/.gitignore
index ca7e6d6..3919206 100644
--- a/.gitignore
+++ b/.gitignore
@@ -435,12 +435,16 @@ FodyWeavers.xsd
*.msm
*.msp
+# Local change logs, egg-infos from package installation and local notebooks for development and interactive work
times.md
docs/traffic_light_examples
misc/mysql-workbench-community_**
misc/berths_and_terminals.csv
+
src/notebooks_metz
src/server/BreCal.egg-info
src/lib_brecal_utils/**.egg-info/**
+**.egg-info
+**/.~lock*
-
+Ampelfunktion.md
\ No newline at end of file
diff --git a/docs/BremenCalling_Datenmodell.xlsx b/docs/BremenCalling_Datenmodell.xlsx
index 67151a3..7f17806 100644
Binary files a/docs/BremenCalling_Datenmodell.xlsx and b/docs/BremenCalling_Datenmodell.xlsx differ
diff --git a/environment.yml b/environment.yml
new file mode 100644
index 0000000..6f4c81a
--- /dev/null
+++ b/environment.yml
@@ -0,0 +1,51 @@
+name: brecal
+
+channels:
+ - conda-forge
+ - carta
+ - anaconda
+ - defaults
+
+dependencies:
+ - pip
+ - colorama>=0.4.6=pyhd8ed1ab_0
+ - coverage>=7.3.0=py311h459d7ec_0
+ - ipykernel>=6.25.1=pyh71e2992_0
+ - ipython>=8.14.0=pyh41d4057_0
+ - jupyterlab>=4.0.5=pyhd8ed1ab_0
+ - mamba>=1.4.9=py311h3072747_0
+ - conda-forge::matplotlib-base>=3.7.2=py311h54ef318_0
+ - conda-forge::matplotlib>=3.7.2=py311h38be061_0
+ - matplotlib-inline>=0.1.6=pyhd8ed1ab_0
+ - conda-forge::pytest>=7.4.0=pyhd8ed1ab_0
+ - conda-forge::pytest-cov>=4.1.0=pyhd8ed1ab_0
+ - python>=3.11.4=hab00c5b_0_cpython
+ - pytz>=2023.3=pyhd8ed1ab_0
+ - setuptools>=68.0.0=pyhd8ed1ab_0
+ - tqdm>=4.66.1=pyhd8ed1ab_0
+ - typing_extensions>=4.7.1=pyha770c72_0
+ - typing_utils>=0.1.0=pyhd8ed1ab_0
+ - conda-forge::pandas>=2.1.0=py311h320fe9a_0
+ - conda-forge/noarch::flask>=2.3.3=pyhd8ed1ab_0
+ - conda-forge::mysql-connector-python>=8.0.31=py311h0cf059c_2
+ - conda-forge::marshmallow>=3.20.1=pyhd8ed1ab_0
+ - conda-forge::marshmallow-dataclass>=8.5.14=pyhd8ed1ab_0
+ - conda-forge::webargs>=8.3.0=pyhd8ed1ab_0
+ - conda-forge::bcrypt>=4.0.1=py311h46250e7_0
+
+ - conda-forge::cached-property>=1.5.2=hd8ed1ab_1
+ - conda-forge::cached_property>=1.5.2=pyha770c72_1
+ - conda-forge::dsnparse>=0.2.1=pyhd8ed1ab_0
+
+ - conda-forge::schedule>=1.2.0=pyhd8ed1ab_0
+
+ - pip:
+ # pip packages and wheels
+ - pyjwt==2.7.0
+ - flask-jwt-extended
+ - pydapper[mysql-connector-python]
+ - coro-context-manager
+ - -e /home/scope/brecal/src/lib_brecal_utils/.
+ - -e /home/scope/brecal/src/server/.
+
+prefix: /home/scope/anaconda3/envs/brecal
diff --git a/src/BreCalClient/Resources/arrow_down_red.png b/src/BreCalClient/Resources/arrow_down_red.png
index 7132feb..14046cc 100644
Binary files a/src/BreCalClient/Resources/arrow_down_red.png and b/src/BreCalClient/Resources/arrow_down_red.png differ
diff --git a/src/BreCalClient/ShipcallExtraControl.xaml b/src/BreCalClient/ShipcallExtraControl.xaml
new file mode 100644
index 0000000..c81f640
--- /dev/null
+++ b/src/BreCalClient/ShipcallExtraControl.xaml
@@ -0,0 +1,14 @@
+
+
+
+
+
diff --git a/src/BreCalClient/ShipcallExtraControl.xaml.cs b/src/BreCalClient/ShipcallExtraControl.xaml.cs
new file mode 100644
index 0000000..8f76d6f
--- /dev/null
+++ b/src/BreCalClient/ShipcallExtraControl.xaml.cs
@@ -0,0 +1,55 @@
+// Copyright (c) 2023 schick Informatik
+// Description: Extra shipcall info shown in drop down
+//
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Windows;
+using System.Windows.Controls;
+using System.Windows.Data;
+using System.Windows.Documents;
+using System.Windows.Input;
+using System.Windows.Media;
+using System.Windows.Media.Imaging;
+using System.Windows.Navigation;
+using System.Windows.Shapes;
+
+namespace BreCalClient
+{
+ ///
+ /// Interaction logic for ShipcallExtraControl.xaml
+ ///
+ public partial class ShipcallExtraControl : UserControl
+ {
+ public ShipcallExtraControl()
+ {
+ InitializeComponent();
+ }
+
+ #region events
+
+ public event Action? CloseExtraRequested;
+
+ #endregion
+
+ #region Properties
+
+ ///
+ /// this is our datasource
+ ///
+ public ShipcallControlModel? ShipcallControlModel { get; set; }
+
+ #endregion
+
+ private void buttonCloseDropDown_Click(object sender, RoutedEventArgs e)
+ {
+ if (this.CloseExtraRequested != null)
+ {
+ this.CloseExtraRequested(this);
+ }
+ }
+ }
+}
diff --git a/src/brecal.mysql/DBManager.cs b/src/brecal.mysql/DBManager.cs
index d5c5571..4c9c593 100644
--- a/src/brecal.mysql/DBManager.cs
+++ b/src/brecal.mysql/DBManager.cs
@@ -55,4 +55,4 @@ namespace brecal.mysql
}
}
-}
\ No newline at end of file
+}
diff --git a/src/lib_brecal_utils/brecal_utils/__init__.py b/src/lib_brecal_utils/brecal_utils/__init__.py
new file mode 100644
index 0000000..b96d315
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/__init__.py
@@ -0,0 +1,20 @@
+from ._version import __version__
+from brecal_utils.file_handling import get_project_root, ensure_path
+from brecal_utils.test_handling import execute_test_with_pytest, execute_coverage_test
+from brecal_utils.time_handling import difference_to_then
+
+from brecal_utils.validators.time_logic import TimeLogic
+from brecal_utils.validators.validation_rules import ValidationRules
+from brecal_utils.validators.schema_validation import validation_state_and_validation_name
+
+__all__ = [
+ "get_project_root",
+ "ensure_path",
+ "execute_test_with_pytest",
+ "execute_coverage_test",
+ "difference_to_then",
+ "TimeLogic",
+ "ValidationRules",
+ "validation_state_and_validation_name",
+]
+
diff --git a/src/lib_brecal_utils/brecal_utils/_version.py b/src/lib_brecal_utils/brecal_utils/_version.py
new file mode 100644
index 0000000..15cf400
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/_version.py
@@ -0,0 +1 @@
+__version__="0.0.1"
\ No newline at end of file
diff --git a/src/lib_brecal_utils/brecal_utils/database/__init__.py b/src/lib_brecal_utils/brecal_utils/database/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/database/enums.py b/src/lib_brecal_utils/brecal_utils/database/enums.py
new file mode 100644
index 0000000..e038643
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/database/enums.py
@@ -0,0 +1,38 @@
+from enum import Enum
+
+class ParticipantType(Enum):
+ """determines the type of a participant"""
+ NONE = 0
+ BSMD = 1
+ TERMINAL = 2
+ PILOT = 4
+ AGENCY = 8
+ MOORING = 16
+ PORT_ADMINISTRATION = 32
+ TUG = 64
+
+class ShipcallType(Enum):
+ """determines the type of a shipcall, as this changes the applicable validation rules"""
+ INCOMING = 1
+ OUTGOING = 2
+ SHIFTING = 3
+
+class ParticipantwiseTimeDelta():
+ """stores the time delta for every participant, which triggers the validation rules in the rule set '0001'"""
+ AGENCY = 1200.0 # 20 h * 60 min/h = 1200 min
+ MOORING = 960.0 # 16 h * 60 min/h = 960 min
+ PILOT = 960.0 # 16 h * 60 min/h = 960 min
+ PORT_ADMINISTRATION = 960.0 # 16 h * 60 min/h = 960 min
+ TUG = 960.0 # 16 h * 60 min/h = 960 min
+ TERMINAL = 960.0 # 16 h * 60 min/h = 960 min
+
+class StatusFlags(Enum):
+ """
+ these enumerators ensure that each traffic light validation rule state corresponds to a value, which will be used in the ValidationRules object to identify
+ the necessity of notifications.
+ """
+ NONE = 0
+ GREEN = 1
+ YELLOW = 2
+ RED = 3
+
diff --git a/src/lib_brecal_utils/brecal_utils/database/sql_handler.py b/src/lib_brecal_utils/brecal_utils/database/sql_handler.py
new file mode 100644
index 0000000..d50bbbd
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/database/sql_handler.py
@@ -0,0 +1,200 @@
+import numpy as np
+import pandas as pd
+import datetime
+from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times
+from brecal_utils.database.enums import ParticipantType
+
+class SQLHandler():
+ """
+ An object that reads SQL queries from the sql_connection and stores it in pandas DataFrames. The object can read all available tables
+ at once into memory, when providing 'read_all=True'.
+
+ # #TODO_initialization: shipcall_tug_map, user_role_map & role_securable_map might be mapped to the respective dataframes
+ """
+ def __init__(self, sql_connection, read_all=False):
+ self.sql_connection = sql_connection
+ self.all_schemas = self.get_all_schemas_from_mysql()
+ self.build_str_to_model_dict()
+
+ if read_all:
+ self.read_all(self.all_schemas)
+
+ def get_all_schemas_from_mysql(self):
+ with self.sql_connection.cursor(buffered=True) as cursor:
+ cursor.execute("SHOW TABLES")
+ schema = cursor.fetchall()
+ all_schemas = [schem[0] for schem in schema]
+ return all_schemas
+
+ def build_str_to_model_dict(self):
+ """
+ creates a simple dictionary, which maps a string to a data object
+ e.g.,
+ 'ship'->BreCal.schemas.model.Ship object
+ """
+ self.str_to_model_dict = {
+ "shipcall":Shipcall, "ship":Ship, "participant":Participant, "berth":Berth, "user":User, "times":Times
+ }
+ return
+
+ def read_mysql_table_to_df(self, table_name:str):
+ """determine a {table_name}, which will be read from a mysql server. returns a pandas DataFrame with the respective data"""
+ df = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=self.sql_connection)
+ return df
+
+ def mysql_to_df(self, query):
+ """provide an arbitrary sql query that should be read from a mysql server {sql_connection}. returns a pandas DataFrame with the obtained data"""
+ df = pd.read_sql(query, self.sql_connection).convert_dtypes()
+ df = df.set_index('id', inplace=False) # avoid inplace updates, so the raw sql remains unchanged
+ return df
+
+ def read_all(self, all_schemas):
+ # create a dictionary, which maps every mysql schema to pandas DataFrames
+ self.df_dict = self.build_full_mysql_df_dict(all_schemas)
+
+ # update the 'participants' column in 'shipcall'
+ self.initialize_shipcall_participant_list()
+ return
+
+ def build_full_mysql_df_dict(self, all_schemas):
+ """given a list of strings {all_schemas}, every schema will be read as individual pandas DataFrames to a dictionary with the respective keys. returns: dictionary {schema_name:pd.DataFrame}"""
+ mysql_df_dict = {}
+ for schem in all_schemas:
+ query = f"SELECT * FROM {schem}"
+ mysql_df_dict[schem] = self.mysql_to_df(query)
+ return mysql_df_dict
+
+ def initialize_shipcall_participant_list(self):
+ """
+ iteratively applies the .get_participants method to each shipcall.
+ the function updates the 'participants' column.
+ """
+ # 1.) get all shipcalls
+ df = self.df_dict.get('shipcall')
+
+ # 2.) iterate over each individual shipcall, obtain the id (pandas calls it 'name')
+ # and apply the 'get_participants' method, which returns a list
+ # if the shipcall_id exists, the list contains ids
+ # otherwise, return a blank list
+ df['participants'] = df.apply(
+ lambda x: self.get_participants(x.name),
+ axis=1)
+ return
+
+ def standardize_model_str(self, model_str:str)->str:
+ """check if the 'model_str' is valid and apply lowercasing to the string"""
+ model_str = model_str.lower()
+ assert model_str in list(self.df_dict.keys()), f"cannot find the requested 'model_str' in mysql: {model_str}"
+ return model_str
+
+ def get_data(self, id:int, model_str:str):
+ """
+ obtains {id} from the respective mysql database and builds a data model from that.
+ the id should match the 'id'-column in the mysql schema.
+ returns: data model, such as Ship, Shipcall, etc.
+
+ e.g.,
+ data = self.get_data(0,"shipcall")
+ returns a Shipcall object
+ """
+ model_str = self.standardize_model_str(model_str)
+
+ df = self.df_dict.get(model_str)
+ data = self.df_loc_to_data_model(df, id, model_str)
+ return data
+
+ def get_all(self, model_str:str)->list:
+ """
+ given a model string (e.g., 'shipcall'), return a list of all
+ data models of that type from the sql
+ """
+ model_str = self.standardize_model_str(model_str)
+ all_ids = self.df_dict.get(model_str).index
+
+ all_data = [
+ self.get_data(_aid, model_str)
+ for _aid in all_ids
+ ]
+ return all_data
+
+ def df_loc_to_data_model(self, df, id, model_str, loc_type:str="loc"):
+ assert len(df)>0, f"empty dataframe"
+
+ # get a pandas series from the dataframe
+ series = df.loc[id] if loc_type=="loc" else df.iloc[id]
+
+ # get the respective data model object
+ data_model = self.str_to_model_dict.get(model_str,None)
+ assert data_model is not None, f"could not find the requested model_str: {model_str}"
+
+ # build 'data' and fill the data model object
+ data = {**{'id':id}, **series.to_dict()} # 'id' must be added manually, as .to_dict does not contain the index, which was set with .set_index
+ data = data_model(**data)
+ return data
+
+ def get_times_for_participant_type(self, df_times, participant_type:int):
+ filtered_series = df_times.loc[df_times["participant_type"]==participant_type]
+ assert len(filtered_series)<=1, f"found multiple results"
+ times = self.df_loc_to_data_model(filtered_series, id=0, model_str='times', loc_type="iloc") # use iloc! to retrieve the first result
+ return times
+
+ def dataframe_to_data_model_list(self, df, model_str)->list:
+ model_str = self.standardize_model_str(model_str)
+
+ all_ids = df.index
+ all_data = [
+ self.df_loc_to_data_model(df, _aid, model_str)
+ for _aid in all_ids
+ ]
+ return all_data
+
+ def get_participants(self, shipcall_id:id)->list:
+ """
+ given a {shipcall_id}, obtain the respective list of participants.
+ when there are no participants, return a blank list
+
+ returns: participant_id_list, where every element is an int
+ """
+ df = self.df_dict.get("shipcall_participant_map")
+ df = df.set_index('shipcall_id', inplace=False)
+
+ # the 'if' call is needed to ensure, that no Exception is raised, when the shipcall_id is not present in the df
+ participant_id_list = df.loc[shipcall_id, "participant_id"].to_list() if shipcall_id in list(df.index) else []
+ return participant_id_list
+
+ def get_times_of_shipcall(self, shipcall)->pd.DataFrame:
+ df_times = self.df_dict.get('times') # -> pd.DataFrame
+ df_times = df_times.loc[df_times["shipcall_id"]==shipcall.id]
+ return df_times
+
+ def get_times_for_agency(self, non_null_column=None)->pd.DataFrame:
+ """
+ options:
+ non_null_column:
+ None or str. If provided, the 'non_null_column'-column of the dataframe will be filtered,
+ so only entries with provided values are returned (filters all NaN and NaT entries)
+ """
+ # get all times
+ df_times = self.df_dict.get('times') # -> pd.DataFrame
+
+ # filter out all NaN and NaT entries
+ if non_null_column is not None:
+ df_times = df_times.loc[~df_times[non_null_column].isnull()] # NOT null filter
+
+ # filter by the agency participant_type
+ times_agency = df_times.loc[df_times["participant_type"]==ParticipantType.AGENCY.value]
+ return times_agency
+
+ def filter_df_by_key_value(self, df, key, value)->pd.DataFrame:
+ return df.loc[df[key]==value]
+
+ def get_unique_ship_counts(self, all_df_times:pd.DataFrame, query:str, rounding:str="min", maximum_threshold=3):
+ """given a dataframe of all agency times, get all unique ship counts, their values (datetime) and the string tags. returns a tuple (values,unique,counts)"""
+ # get values and optional: rounding
+ values = all_df_times.loc[:, query]
+ if rounding is not None:
+ values = values.dt.round(rounding) # e.g., 'min'
+
+ unique, counts = np.unique(values, return_counts=True)
+ violation_state = np.any(np.greater(counts, maximum_threshold))
+ return (values, unique, counts)
diff --git a/src/lib_brecal_utils/brecal_utils/file_handling.py b/src/lib_brecal_utils/brecal_utils/file_handling.py
new file mode 100644
index 0000000..0b3c171
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/file_handling.py
@@ -0,0 +1,46 @@
+
+
+import os
+
+def get_project_root(root_base_name:str, root_dir:None=None):
+ """
+ given a {root_base_name}, this function searches the parent folders of {root_dir} until
+ the basename matches.
+
+ Example:
+ root_base_name = "brecal"
+ root_dir = "/home/arbitrary_user/brecal/_template/tests"
+
+ returns: "/home/arbitrary_user/brecal"
+
+ arguments:
+ root_base_name:str, base directory name that should be searched for
+ root_dir: defaults to 'None', whereas then the current working directory is selected. Can be an arbitrary path.
+
+ returns: root_dir
+ """
+ if root_dir is None:
+ root_dir = os.getcwd()
+
+ assert root_base_name in root_dir, f"the desired base name MUST be present within the root directory.\nRoot Directory: {root_dir}\nDesired Root Base Name: {root_base_name}"
+ assert root_dir.count(root_base_name)==1, f"found multiple matches for root_base_name" # do not change, as a pytest requires precise wording
+
+
+ while not os.path.basename(root_dir)==root_base_name:
+ root_dir = os.path.dirname(root_dir)
+
+ return root_dir
+
+def ensure_path(path, print_info=0):
+ """
+ Function ensures that a certain directory exists. If it does not exist, it will be created.
+ It further checks if the parent-directory of the file exists and also ensures that.
+
+ options:
+ print_info: print additional information (debugging)
+ """
+ if not os.path.exists(path):
+ os.makedirs(path)
+ if print_info == 1:
+ print(f"Created directory and subdirectories: {path}")
+ return
diff --git a/src/lib_brecal_utils/brecal_utils/request_status_code.py b/src/lib_brecal_utils/brecal_utils/request_status_code.py
new file mode 100644
index 0000000..c3d72ea
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/request_status_code.py
@@ -0,0 +1,131 @@
+import json
+from abc import ABC, abstractmethod
+from BreCal.schemas.model import obj_dict
+
+
+"""implementation of default objects for http request codes. this enforces standardized outputs in the (response, code, headers)-style"""
+
+def get_request_code(code_id):
+ """convenience function, which returns the desired request code object"""
+ request_code_dict = {
+ 200:RequestCode_HTTP_200_OK,
+ 201:RequestCode_HTTP_201_CREATED,
+ 400:RequestCode_HTTP_400_BAD_REQUEST,
+ 403:RequestCode_HTTP_403_FORBIDDEN,
+ 404:RequestCode_HTTP_404_NOT_FOUND,
+ 500:RequestCode_HTTP_500_INTERNAL_SERVER_ERROR
+ }
+
+ assert code_id in list(request_code_dict.keys()), f"unsupported request code: {code_id}. \nAvailable codes: {request_code_dict}"
+ return request_code_dict.get(code_id)()
+
+class RequestStatusCode(ABC):
+ def __init__(self):
+ return
+
+ @abstractmethod
+ def __call__(self, data):
+ raise NotImplementedError("any default status code object must be callable")
+
+ @abstractmethod
+ def status_code(self):
+ raise NotImplementedError("any default status code object should return an integer")
+
+ @abstractmethod
+ def response(self, data):
+ raise NotImplementedError("the response method should return a binary json object. typically, json.dumps is used")
+
+
+ def headers(self):
+ return {'Content-Type': 'application/json; charset=utf-8'}
+
+
+class RequestCode_HTTP_200_OK(RequestStatusCode):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __call__(self, data):
+ return (self.response(data), self.status_code(), self.headers())
+
+ def status_code(self):
+ return 200
+
+ def response(self, data):
+ return json.dumps(data, default=obj_dict)
+
+
+class RequestCode_HTTP_201_CREATED(RequestStatusCode):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __call__(self, data):
+ return (self.response(data), self.status_code(), self.headers())
+
+ def status_code(self):
+ return 201
+
+ def response(self, new_id):
+ return json.dumps({"id":new_id})
+
+
+class RequestCode_HTTP_400_BAD_REQUEST(RequestStatusCode):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __call__(self, data):
+ return (self.response(data), self.status_code(), self.headers())
+
+ def status_code(self):
+ return 400
+
+ def response(self, data):
+ return json.dumps(data)
+
+
+class RequestCode_HTTP_403_FORBIDDEN(RequestStatusCode):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __call__(self, data):
+ return (self.response(data), self.status_code(), self.headers())
+
+ def status_code(self):
+ return 403
+
+ def response(self, message="invalid credentials"):
+ result = {}
+ result["message"] = message
+ return json.dumps(result)
+
+
+class RequestCode_HTTP_404_NOT_FOUND(RequestStatusCode):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __call__(self, data):
+ return (self.response(data), self.status_code(), self.headers())
+
+ def status_code(self):
+ return 404
+
+ def response(self, message="no such record"):
+ result = {}
+ result["message"] = message
+ return json.dumps(result)
+
+
+class RequestCode_HTTP_500_INTERNAL_SERVER_ERROR(RequestStatusCode):
+ def __init__(self) -> None:
+ super().__init__()
+
+ def __call__(self, data):
+ return (self.response(data), self.status_code(), self.headers())
+
+ def status_code(self):
+ return 500
+
+ def response(self, message="credential lookup mismatch"):
+ result = {}
+ result["message"] = message
+ return json.dumps(result)
+
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/__init__.py b/src/lib_brecal_utils/brecal_utils/stubs/__init__.py
new file mode 100644
index 0000000..0516049
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/__init__.py
@@ -0,0 +1,5 @@
+
+def generate_uuid1_int():
+ """# TODO: clarify, what kind of integer ID is used in mysql"""
+ from uuid import uuid1
+ return uuid1().int>>64
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/berth.py b/src/lib_brecal_utils/brecal_utils/stubs/berth.py
new file mode 100644
index 0000000..a335c18
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/berth.py
@@ -0,0 +1,27 @@
+import datetime
+from brecal_utils.stubs import generate_uuid1_int
+from BreCal.schemas.model import Berth
+
+def get_berth_simple():
+ berth_id = generate_uuid1_int() # uid?
+
+ # Note: #TODO: name, participant_id & lock state are arbitrary
+ name = "Avangard Dalben"
+ participant_id = 1 # e.g., Avangard
+ lock = False
+
+ created = datetime.datetime.now()
+ modified = created+datetime.timedelta(seconds=10)
+ deleted = modified+datetime.timedelta(seconds=3)
+
+ berth = Berth(
+ berth_id,
+ name,
+ participant_id,
+ lock,
+ created,
+ modified,
+ deleted,
+ )
+ return berth
+
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/notification.py b/src/lib_brecal_utils/brecal_utils/stubs/notification.py
new file mode 100644
index 0000000..765eea4
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/notification.py
@@ -0,0 +1,49 @@
+import datetime
+from brecal_utils.stubs import generate_uuid1_int
+from BreCal.schemas.model import Notification
+
+
+def get_notification_simple():
+ """creates a default notification, where 'created' is now, and modified is now+10 seconds"""
+ notification_id = generate_uuid1_int() # uid?
+ times_id = generate_uuid1_int() # uid?
+ acknowledged = False
+ level = 10
+ type = 0
+ message = "hello world"
+ created = datetime.datetime.now()
+ modified = created+datetime.timedelta(seconds=10)
+
+ notification = Notification(
+ notification_id,
+ times_id,
+ acknowledged,
+ level,
+ type,
+ message,
+ created,
+ modified
+ )
+ return notification
+
+def get_notification_in_the_past(created_delta_seconds, modified_delta_seconds, acknowledged=False):
+ """
+ creates a notification of the past, where the
+ 'created' date is {created_delta_seconds} seconds ago
+ 'modified' date is {modified_delta_seconds} seconds ago
+
+ for example, if datetime.datetime.now() returns
+ now = datetime.datetime(2023, 9, 15, 7, 25, 50, 733644)), then calling this function
+ as get_notification_modified_in_the_past(2*60, 1*60) provides
+ 'created':datetime.datetime(2023, 9, 15, 7, 23, 50, 733644) (two minutes ago)
+ 'modified':datetime.datetime(2023, 9, 15, 7, 24, 50, 733644) (one minute ago)
+
+ optionally, one can also overwrite the 'acknowledged' attribute
+ returns notification
+ """
+ notification = get_notification_simple()
+ notification.created = datetime.datetime.now()-datetime.timedelta(seconds=created_delta_seconds)
+ notification.modified = datetime.datetime.now()-datetime.timedelta(seconds=modified_delta_seconds)
+ notification.acknowledged = acknowledged
+ return notification
+
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/participant.py b/src/lib_brecal_utils/brecal_utils/stubs/participant.py
new file mode 100644
index 0000000..697a011
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/participant.py
@@ -0,0 +1,32 @@
+import datetime
+from brecal_utils.stubs import generate_uuid1_int
+from BreCal.schemas.model import Participant
+
+def get_participant_simple():
+ participant_id = generate_uuid1_int()
+
+ # #TODO: role_type and flags are arbitrary
+ name = "Max Mustermann"
+ street = "Musterstrasse 1"
+ postal_code = "12345"
+ city = "Bremen"
+ role_type = 1 # integer
+ flags = 0 # integer. unclear
+
+ created = datetime.datetime.now()
+ modified = created+datetime.timedelta(seconds=10)
+ deleted = modified+datetime.timedelta(seconds=3)
+
+ participant = Participant(
+ participant_id,
+ name,
+ street,
+ postal_code,
+ city,
+ role_type,
+ flags,
+ created,
+ modified,
+ deleted
+ )
+ return participant
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/roles.py b/src/lib_brecal_utils/brecal_utils/stubs/roles.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/ship.py b/src/lib_brecal_utils/brecal_utils/stubs/ship.py
new file mode 100644
index 0000000..61468c3
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/ship.py
@@ -0,0 +1,38 @@
+import datetime
+from brecal_utils.stubs import generate_uuid1_int
+from BreCal.schemas.model import Ship
+
+def get_ship_simple():
+ ship_id = generate_uuid1_int()
+ name = "african halcyon".upper() # 'Schiffe_sample_format.xlsx' uses .upper() for every ship
+ imo = 9343613 # assert str(len(imo))==7
+ callsign = 1234567 # up to 7 characters. assert str(len(callsign))<=7
+ participant_id = generate_uuid1_int()
+ length = 177.13 # assert 0>length<=500
+ width = 28.4 # assert 0>width<=500
+ is_tug = False
+ bollard_pull = None # only if is_tug
+ participant_id = None # only if is_tug
+ eni = "01234567" # Alternative to IMO. Dynamic assertion? assert len(str(eni))==8
+
+ created = datetime.datetime.now()
+ modified = created+datetime.timedelta(seconds=10)
+ deleted = modified+datetime.timedelta(seconds=3)
+
+ ship = Ship(
+ ship_id,
+ name,
+ imo,
+ callsign,
+ participant_id,
+ length,
+ width,
+ is_tug,
+ bollard_pull,
+ eni,
+ created,
+ modified,
+ deleted
+ )
+ return ship
+
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/shipcall.py b/src/lib_brecal_utils/brecal_utils/stubs/shipcall.py
new file mode 100644
index 0000000..6762692
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/shipcall.py
@@ -0,0 +1,81 @@
+import datetime
+from brecal_utils.stubs import generate_uuid1_int
+from BreCal.schemas.model import Shipcall
+from dataclasses import field
+
+def get_shipcall_simple():
+ # only used for the stub
+ base_time = datetime.datetime.now()
+
+ shipcall_id = generate_uuid1_int()
+ ship_id = generate_uuid1_int()
+
+ eta = base_time+datetime.timedelta(hours=3, minutes=12)
+ role_type = 1
+ voyage = "987654321"
+ etd = base_time+datetime.timedelta(hours=6, minutes=12) # should never be before eta
+
+ arrival_berth_id = generate_uuid1_int()
+ departure_berth_id = generate_uuid1_int()
+
+ tug_required = False
+ pilot_required = False
+
+ flags = 0 # #TODO_shipcall_flags. What is meant here? What should be tested?
+ pier_side = False # whether a ship will be fixated on the pier side. en: pier side, de: Anlegestelle. From 'BremenCalling_Datenmodell.xlsx': gedreht/ungedreht
+ bunkering = False # #TODO_bunkering_unclear
+ replenishing_terminal = False # en: replenishing terminal, de: Nachfüll-Liegeplatz
+ replenishing_lock = False # en: replenishing lock, de: Nachfüllschleuse
+
+ draft = 0.12 # #TODO_draft_value: clarify, what 'draft' means and what kind of values are to be expected
+
+ # tidal window: built in a way, where ETA and ETD are in-between the window
+ # #TODO_tidal_window_source: are these windows taken from a database or provided by the user? How do they know this?
+ tidal_window_from = base_time+datetime.timedelta(hours=2, minutes=12)
+ tidal_window_to = base_time+datetime.timedelta(hours=7, minutes=12)
+ rain_sensitive_cargo = False
+ recommended_tugs = 2 # assert 0eta (for berth) & lock
+ # note 2: times are currently computed as a sequence of (eta_berth -> lock_time -> etd_berth -> zone_entry). The deltas are arbitrary
+ times_id = generate_uuid1_int()
+
+ eta_berth = base_time+datetime.timedelta(hours=1, minutes=12)
+ eta_berth_fixed = False
+
+ lock_time = eta_berth+datetime.timedelta(hours=0, minutes=50)
+ lock_time_fixed = False
+
+ etd_berth = lock_time+datetime.timedelta(hours=0, minutes=45)
+ etd_berth_fixed = False
+
+ zone_entry = etd_berth+datetime.timedelta(hours=0, minutes=15)
+ zone_entry_fixed = False
+
+ operations_start = zone_entry+datetime.timedelta(hours=1, minutes=30)
+ operations_end = operations_start+datetime.timedelta(hours=4, minutes=30)
+
+ remarks = "" # assert len(remarks)<{max_len_threshold}
+
+ participant_id = generate_uuid1_int()
+ shipcall_id = generate_uuid1_int()
+
+ berth_id = generate_uuid1_int()
+ berth_info = ""
+ pier_side = True
+ participant_type = None
+
+ created = datetime.datetime.now()
+ modified = created+datetime.timedelta(seconds=10)
+
+ times = Times(
+ id=times_id,
+ eta_berth=eta_berth,
+ eta_berth_fixed=eta_berth_fixed,
+ etd_berth=etd_berth,
+ etd_berth_fixed=etd_berth_fixed,
+ lock_time=lock_time,
+ lock_time_fixed=lock_time_fixed,
+ zone_entry=zone_entry,
+ zone_entry_fixed=zone_entry_fixed,
+ operations_start=operations_start,
+ operations_end=operations_end,
+ remarks=remarks,
+ participant_id=participant_id,
+ berth_id=berth_id,
+ berth_info=berth_info,
+ pier_side=pier_side,
+ participant_type=participant_type,
+ shipcall_id=shipcall_id,
+ created=created,
+ modified=modified,
+ )
+ return times
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/times_mooring.py b/src/lib_brecal_utils/brecal_utils/stubs/times_mooring.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/times_pilot.py b/src/lib_brecal_utils/brecal_utils/stubs/times_pilot.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/times_portauthority.py b/src/lib_brecal_utils/brecal_utils/stubs/times_portauthority.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/times_terminal.py b/src/lib_brecal_utils/brecal_utils/stubs/times_terminal.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/stubs/user.py b/src/lib_brecal_utils/brecal_utils/stubs/user.py
new file mode 100644
index 0000000..12b8d2c
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/stubs/user.py
@@ -0,0 +1,35 @@
+import bcrypt
+import datetime
+from brecal_utils.stubs import generate_uuid1_int
+from BreCal.schemas.model import User
+
+
+def get_user_simple():
+ user_id = generate_uuid1_int()
+ participant_id = generate_uuid1_int() # should be taken from the database
+
+ first_name = "Max"
+ last_name = "Mustermann"
+ user_name = "maxm123"
+ user_email = "max.mustermann@brecal.de"
+ user_phone = "0173123456" # formatting?
+ password_hash = bcrypt.hashpw("123456".encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
+ api_key = bcrypt.hashpw("apikey123".encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
+
+ created = datetime.datetime.now()
+ modified = created+datetime.timedelta(seconds=10)
+
+ user = User(
+ user_id,
+ participant_id,
+ first_name,
+ last_name,
+ user_name,
+ user_email,
+ user_phone,
+ password_hash,
+ api_key,
+ created,
+ modified
+ )
+ return user
\ No newline at end of file
diff --git a/src/lib_brecal_utils/brecal_utils/test_handling.py b/src/lib_brecal_utils/brecal_utils/test_handling.py
new file mode 100644
index 0000000..3757f0d
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/test_handling.py
@@ -0,0 +1,84 @@
+
+def execute_test_with_pytest(filepath):
+ """
+ creates a subprocess to use 'pytest' on a script. Every function inside the filepath
+ will be tested individually. The function returns verbose information about the outcome.
+
+ filepath:
+ can either be an individual .py file or a root directory, which contains multiple files
+ """
+ import os
+ import pytest
+ from subprocess import Popen, PIPE
+
+ assert os.path.exists(filepath), f"cannot find file {filepath}"
+
+ with Popen(['pytest',
+ '-v',
+ '-W ignore::DeprecationWarning',
+ '-vv',
+ '--durations=0',
+ '--tb=short', # shorter traceback format
+ str(filepath)], stdout=PIPE, bufsize=1,
+ universal_newlines=True) as p:
+ for line in p.stdout:
+ print(line, end='')
+ return
+
+def execute_coverage_test(tests_path, coverage_path, cov_report_dst_dir=None, cov_fail_under_rate=80, is_test=0):
+ """
+ creates a subprocess to use 'coverage' on a script. Every function inside the file
+ will be tested individually. The function returns verbose information about the outcome.
+
+ this function needs two inputs:
+ tests_path, a path that locates each test that should be executed
+ e.g.: "/home/scope_sorting/brecal/src/lib_brecal_utils/tests"
+
+ coverage_path, a path where the code is stored, which should be analyzed for coverage
+ e.g.: "/home/scope_sorting/brecal/src/lib_brecal_utils/brecal_utils"
+
+ optional:
+ cov_report_dst_dir, which determines, where the coverage report will be stored. This function then
+ creates & stores an .html and .xml report in that folder. default: None
+
+ cov_fail_under_rate, an integer which determines, when a coverage test should fail. Default: 80, meaning
+ that at least 80 % of the directory should be tested to pass the test.
+ """
+ import os
+ import pytest
+ from subprocess import Popen, PIPE
+
+ assert os.path.exists(tests_path), f"cannot find root directory {tests_path}"
+ assert os.path.exists(coverage_path), f"cannot find root directory {coverage_path}"
+
+ if cov_report_dst_dir is not None:
+ p_open_list_arguments = [
+ 'pytest',
+ f"{str(tests_path)}",
+ "-v",
+ "-vv",
+ "--durations=0",
+ "--cov-report=term",
+ f"--cov-report=html:{cov_report_dst_dir}",
+ f"--cov-report=xml:{cov_report_dst_dir}/coverage.xml",
+ f"--cov-fail-under={cov_fail_under_rate}",
+ f"--cov={str(coverage_path)}",
+ ]
+ else:
+ p_open_list_arguments = [
+ 'pytest',
+ f"{str(tests_path)}",
+ "-v",
+ "-vv",
+ "--durations=0",
+ "--cov-report=term",
+ f"--cov-fail-under={cov_fail_under_rate}",
+ f"--cov={str(coverage_path)}",
+ ]
+
+ with Popen(p_open_list_arguments, stdout=PIPE, bufsize=1,
+ universal_newlines=True) as p:
+ for line in p.stdout:
+ print(line, end='')
+ if is_test:
+ raise KeyboardInterrupt("is_test_interrupt")
diff --git a/src/lib_brecal_utils/brecal_utils/time_handling.py b/src/lib_brecal_utils/brecal_utils/time_handling.py
new file mode 100644
index 0000000..7571a67
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/time_handling.py
@@ -0,0 +1,21 @@
+import datetime
+
+def difference_to_then(event_time, tgt_time=None, make_absolute=False):
+ """
+ measures the difference between {tgt_time} and {event_time}. this function automatically converts the datetime.timedelta object to seconds.
+ tgt_time defaults to {now}, if it is not specified.
+
+ Note: using divmod(time_diff, interval_duration) may be interesting to determine, how many units of {interval_duration} have passed.
+ e.g.,
+ divmod(time_diff, 3600) returns a float of hours. This will then return a tuple
+
+ options:
+ make_absolute: bool. Whether to return an absolute difference
+
+ Returns: time_diff (float)
+ """
+ tgt_time = tgt_time or datetime.datetime.now()
+ time_diff = tgt_time - event_time
+ if make_absolute:
+ return abs(time_diff.total_seconds())
+ return time_diff.total_seconds()
diff --git a/src/lib_brecal_utils/brecal_utils/validators/__init__.py b/src/lib_brecal_utils/brecal_utils/validators/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/brecal_utils/validators/input_validation.py b/src/lib_brecal_utils/brecal_utils/validators/input_validation.py
new file mode 100644
index 0000000..57200e8
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/validators/input_validation.py
@@ -0,0 +1,165 @@
+
+####################################### InputValidation #######################################
+
+from abc import ABC, abstractmethod
+from BreCal.schemas.model import Ship, Shipcall, Berth, User, Participant
+
+class InputValidation():
+ def __init__(self):
+ self.build_supported_models_dictionary()
+ return
+
+ def build_supported_models_dictionary(self):
+ self.supported_models = {
+ Ship:ShipValidation(),
+ Shipcall:ShipcallValidation(),
+ Berth:BerthValidation(),
+ User:UserValidation(),
+ Participant:ParticipantValidation(),
+ }
+ return
+
+ def assert_if_not_supported(self, dataclass_object):
+ assert type(dataclass_object) in self.supported_models, f"unsupported model. Found: {type(dataclass_object)}"
+ return
+
+ def verify(self, dataclass_object):
+ self.assert_if_not_supported(dataclass_object)
+
+ # determine the type of the dataclass object. The internal dictionary 'supported_models' matches the dataclass object
+ # to the respective validation protocol
+ validator = self.supported_models.get(type(dataclass_object))
+
+ # check the object based on the rules within the matched validator
+ input_validation_state = validator.check(dataclass_object)
+ return input_validation_state
+
+
+class DataclassValidation(ABC):
+ """parent class of dataclas validators, which determines the outline of every object"""
+ def __init__(self):
+ return
+
+ def check(self, dataclass_object) -> (list, bool):
+ """
+ the 'check' method provides a default style, how each dataclass object is validated. It returns a list of violations
+ and a boolean, which determines, whether the check is passed successfully
+ """
+ all_rules = self.apply_all_rules(dataclass_object)
+ violations = self.filter_violations(all_rules)
+ input_validation_state = self.evaluate(violations)
+ return (violations, input_validation_state)
+
+ @abstractmethod
+ def apply_all_rules(self, dataclass_object) -> list:
+ """
+ the 'apply_all_rules' method is mandatory for any dataclass validation object. It should execute all validation rules and
+ return a list of tuples, where each element is (output_boolean, validation_name)
+ """
+ all_rules = [(True, 'blank_validation_rule')]
+ return all_rules
+
+ def filter_violations(self, all_rules):
+ """input: all_rules, a list of tuples, where each element is (output, validation_name), which are (bool, str). """
+ # if output is False, a violation is observed
+ violations = [result[1] for result in all_rules if not result[0]]
+ return violations
+
+ def evaluate(self, violations) -> bool:
+ input_validation_state = len(violations)==0
+ return input_validation_state
+
+
+
+class ShipcallValidation(DataclassValidation):
+ """an object that validates a Shipcall dataclass object"""
+ def __init__(self):
+ super().__init__()
+ return
+
+ def apply_all_rules(self, dataclass_object) -> list:
+ """apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
+ raise NotImplementedError()
+ return all_rules
+
+
+from brecal_utils.validators.schema_validation import ship_bollard_pull_is_defined_or_is_not_tug, ship_bollard_pull_is_none_or_in_range, ship_callsign_len_is_seven_at_maximum, ship_eni_len_is_eight, ship_imo_len_is_seven, ship_length_in_range, ship_participant_id_is_defined_or_is_not_tug, ship_participant_id_is_none_or_int, ship_width_in_range
+# skip: ship_max_draft_is_defined_or_is_not_tug, ship_max_draft_is_none_or_in_range,
+class ShipValidation(DataclassValidation):
+ """an object that validates a Ship dataclass object"""
+ def __init__(self):
+ super().__init__()
+ return
+
+ def apply_all_rules(self, dataclass_object) -> list:
+ """apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
+ # skip: ship_max_draft_is_defined_or_is_not_tug, ship_max_draft_is_none_or_in_range,
+ """
+ #TODO_ship_max_draft
+ with pytest.raises(AttributeError, match="'Ship' object has no attribute 'max_draft'"):
+ assert ship_max_draft_in_range(ship)[0], f"max draft of a ship must be between 0 and 20 meters"
+ assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft should either be undefined or between 0 and 20 meters"
+ """
+
+ # list comprehension: every function becomes part of the loop and will be executed. Each function is wrapped and provides (output, validation_name)
+ all_rules = [
+ # tuple: (output, validation_name)
+ check_rule(dataclass_object)
+
+ for check_rule in [
+ ship_bollard_pull_is_defined_or_is_not_tug,
+ ship_bollard_pull_is_none_or_in_range,
+ ship_callsign_len_is_seven_at_maximum,
+ ship_eni_len_is_eight,
+ ship_imo_len_is_seven,
+ ship_length_in_range,
+ ship_participant_id_is_defined_or_is_not_tug,
+ ship_participant_id_is_none_or_int,
+ ship_width_in_range
+ ]
+ ]
+ return all_rules
+
+class BerthValidation(DataclassValidation):
+ """an object that validates a Berth dataclass object"""
+ def __init__(self):
+ super().__init__()
+ return
+
+ def apply_all_rules(self, dataclass_object) -> list:
+ """apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
+ raise NotImplementedError()
+ return all_rules
+
+class UserValidation(DataclassValidation):
+ """an object that validates a User dataclass object"""
+ def __init__(self):
+ super().__init__()
+ return
+
+ def apply_all_rules(self, dataclass_object) -> list:
+ """apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
+ raise NotImplementedError()
+ return all_rules
+
+from brecal_utils.validators.schema_validation import participant_postal_code_len_is_five
+class ParticipantValidation(DataclassValidation):
+ """an object that validates a Participant dataclass object"""
+ def __init__(self):
+ super().__init__()
+ return
+
+ def apply_all_rules(self, dataclass_object) -> list:
+ """apply all input validation rules to determine, whether there are violations. returns a list of tuples (output, validation_name)"""
+
+ # list comprehension: every function becomes part of the loop and will be executed. Each function is wrapped and provides (output, validation_name)
+ all_rules = [
+ # tuple: (output, validation_name)
+ check_rule(dataclass_object)
+
+ for check_rule in [
+ participant_postal_code_len_is_five,
+ ]
+ ]
+ return all_rules
+
diff --git a/src/lib_brecal_utils/brecal_utils/validators/schema_validation.py b/src/lib_brecal_utils/brecal_utils/validators/schema_validation.py
new file mode 100644
index 0000000..384b2f6
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/validators/schema_validation.py
@@ -0,0 +1,139 @@
+# wrapper: every validation function returns a tuple of (validation_state, validation_name)
+# example: validate_ship_eni_length might return the tuple (True, 'ship_eni_length')
+# thereby, one could always know, which test causes an issue
+
+####################################### general functions #######################################
+
+def validation_state_and_validation_name(validation_name):
+ """
+ can wrap arbitrary functions, so they return (output, validation_name)-tuples
+ usage example:
+ @validation_state_and_validation_name("ship_eni_length")
+ def validate_ship_eni_length(ship):
+ return length_matches_exactly(ship.eni,8)
+ """
+ def wrapper(validation_fct):
+ def decorated_fct(*args, **kwargs):
+ return (validation_fct(*args, **kwargs), validation_name)
+ return decorated_fct
+ return wrapper
+
+def value_in_range(query_value, start_range, end_range):
+ """determines, whether the query_value is greater than start_range, but smaller than end_range. Returns bool"""
+ return start_range src_time)
+ if the tgt_time is in the past, it is a negative value (tgt_time < src_time)
+
+ returns the delta between tgt_time and src_time as a float of minutes (or the optionally provided unit)
+
+ options:
+ unit: str, which defaults to 'm' (minutes). 'h' (hours) or 's' (seconds) are also common units. Determines the unit of the output time delta
+ """
+ # convert np.datetime64
+ if isinstance(src_time, pd.Timestamp):
+ src_time = src_time.to_datetime64()
+
+ if isinstance(tgt_time, pd.Timestamp):
+ tgt_time = tgt_time.to_datetime64()
+
+ if isinstance(src_time, datetime.datetime):
+ src_time = np.datetime64(src_time)
+
+ if isinstance(tgt_time, datetime.datetime):
+ tgt_time = np.datetime64(tgt_time)
+
+ delta = tgt_time - src_time
+ minute_delta = delta / np.timedelta64(1, unit)
+ return minute_delta
+
+ def time_delta_from_now_to_tgt(self, tgt_time, unit="m"):
+ return self.time_delta(datetime.datetime.now(), tgt_time=tgt_time, unit=unit)
+
+ def time_inbetween(self, query_time:datetime.datetime, start_time:datetime.datetime, end_time:datetime.datetime) -> bool:
+ """
+ checks, whether the query time is inbetween the start & end time. Returns a bool to indicate that.
+
+ Example:
+ a = datetime.datetime(2017, 5, 16, 8, 21, 10)
+ b = datetime.datetime(2017, 5, 17, 8, 21, 10)
+ c = datetime.datetime(2017, 5, 18, 8, 21, 10)
+
+ is b between a and c? -> yes. Returns True
+ is c between a and b? -> no. Returns False
+
+ returns bool
+ """
+ assert isinstance(query_time, datetime.datetime)
+ assert isinstance(start_time, datetime.datetime)
+ assert isinstance(end_time, datetime.datetime)
+
+ return start_time <= query_time <= end_time
+
+ def time_inbetween_absolute_delta(self, query_time:datetime.datetime, start_time:datetime.datetime, end_time:datetime.datetime) -> tuple:
+ """
+ similarly to self.time_inbetween, this function compares a query_time with the provided start and end time.
+ however, this function instead returns timedelta objects, which show the difference towards start and end
+
+ this function applies abs() to return only absolute deviations. Thereby, -23 becomes +23
+
+ returns: tuple(absolute_start_delta, absolute_end_delta)
+ """
+ return (abs(query_time-start_time), abs(query_time-end_time))
+
+ def compare_query_is_inbetween_list(self, query_time, list_of_other_times) -> list:
+ list_of_bools = [
+ self.time_inbetween(query_time, time_elem_begin, time_elem_end)
+ for (time_elem_begin, time_elem_end) in list_of_other_times
+ ]
+ return list_of_bools
+
+ def query_time_any_inbetween(self, query_time, list_of_other_times):
+ """
+ given a query_time element, the element will be compared to every element in a list, where each
+ element is a tuple of (start_time, end_time)
+ """
+ if len(list_of_other_times)==0:
+ # the time is not inbetween, if the provided list is empty
+ return False
+
+ list_of_bools = self.compare_query_is_inbetween_list(query_time, list_of_other_times)
+ return np.any(list_of_bools), list_of_bools
+
+
diff --git a/src/lib_brecal_utils/brecal_utils/validators/validation_rule_functions.py b/src/lib_brecal_utils/brecal_utils/validators/validation_rule_functions.py
new file mode 100644
index 0000000..39a34c7
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/validators/validation_rule_functions.py
@@ -0,0 +1,763 @@
+import inspect
+import types
+from brecal_utils.database.enums import ParticipantType, ShipcallType, ParticipantwiseTimeDelta
+import numpy as np
+import pandas as pd
+from brecal_utils.validators.time_logic import TimeLogic
+from brecal_utils.database.enums import StatusFlags
+#from brecal_utils.validators.schema_validation import validation_state_and_validation_name
+
+
+class ValidationRuleBaseFunctions():
+ """
+ Base object with individual functions, which the {ValidationRuleFunctions}-child refers to.
+ This parent class provides base functions and helps to restructure the code in a more comprehensible way.
+ """
+ def __init__(self, sql_handler):
+ self.sql_handler = sql_handler
+ self.time_logic = TimeLogic()
+
+ def check_time_delta_violation_query_time_to_now(self, query_time:pd.Timestamp, key_time:pd.Timestamp, threshold:float)->bool:
+ """
+ # base function for all validation rules in the group {0001} A-L
+
+ measures the time between NOW and query_time.
+ When the query_time lays in the past, the delta is negative
+ when the query_time lays in the future, the delta is positive
+
+ returns a violation state depending on whether the delta is
+ Violation, if: 0 >= delta > threshold
+
+ When the key time is defined (not None), there is no violation. Returns False
+
+ options:
+ query_time: will be used to measure the time difference of 'now' until the query time
+ key_time: will be used to check, whether the respective key already has a value
+ threshold: threshold where a time difference becomes crucial. When the delta is below the threshold, a violation might occur
+ """
+ # rule is not applicable -> return 'GREEN'
+ if key_time is not None:
+ return False
+
+ # otherwise, this rule applies and the difference between 'now' and the query time is measured
+ delta = self.time_logic.time_delta_from_now_to_tgt(tgt_time=query_time, unit="m")
+
+ # a violation occurs, when the delta (in minutes) exceeds the specified threshold of a participant
+ # to prevent past-events from triggering violations, negative values are ignored
+ # Violation, if 0 >= delta >= threshold
+ violation_state = (delta >= 0) and (delta<=threshold)
+ return violation_state
+
+ def check_participants_agree_on_estimated_time(self, shipcall, query, df_times, applicable_shipcall_type)->bool:
+ """
+ # base function for all validation rules in the group {0002} A-C
+
+ compares, whether the participants agree on the estimated time (of arrival or departure), depending on
+ whether the shipcall type is incoming, outgoing or shifting.
+
+ No violations are observed, when
+ - the shipcall belongs to a different type than the rule expects
+ - there are no matching times for the provided {query} (e.g., "eta_berth")
+
+ Instead of comparing each individual result, this function counts the amount of unique instances.
+ When there is not only one unique value, there are deviating time estimates, and a violation occurs
+
+ returns: violation_state (bool)
+ """
+ # shipcall type filter: consider only shipcalls, where the type matches
+ if shipcall.type != applicable_shipcall_type.value:
+ violation_state = False
+ return violation_state
+
+ # filter by participant types of interest (agency, mooring, portauthority/administration, pilot, tug)
+ participant_types = [ParticipantType.AGENCY.value, ParticipantType.MOORING.value, ParticipantType.PORT_ADMINISTRATION.value, ParticipantType.PILOT.value, ParticipantType.TUG.value]
+ df_times = df_times.loc[df_times["participant_type"].isin(participant_types),:]
+
+ # exclude missing entries
+ df_times.loc[~df_times[query].isnull(),:]
+
+ # when there are no entries left (no entries are provided), skip
+ if len(df_times)==0:
+ violation_state = False
+ return violation_state
+
+ # there should only be one eta_berth, when all participants have provided the same time
+ # this equates to the same criteria as checking, whether
+ # times_agency.eta_berth==times_mooring.eta_berth==times_portadministration.eta_berth==times_pilot.eta_berth==times_tug.eta_berth
+ unique_times = len(pd.unique(df_times.loc[:,query]))
+ violation_state = unique_times!=1
+ return violation_state
+
+ def check_unique_shipcall_counts(self, query:str, rounding="min", maximum_threshold=3)->bool:
+ """
+ # base function for all validation rules in the group {0005} A&B
+
+ compares how many unique times are found for the provided {query} (e.g., "eta_berth")
+ This function rounds the results, counts the unique values and returns a boolean state, whether the {maximum_threshold} is exceeded
+ """
+ # filter the df: keep only times_agents
+ # filter out all NaN and NaT entries
+ times_agency = self.sql_handler.get_times_for_agency(non_null_column=query)
+
+ # get values and optionally round the values
+ (values, unique, counts) = self.sql_handler.get_unique_ship_counts(all_df_times=times_agency, query=query, rounding=rounding, maximum_threshold=maximum_threshold)
+
+ # when ANY of the unique values exceeds the threshold, a violation is observed
+ violation_state = np.any(np.greater(counts, maximum_threshold))
+ return violation_state
+
+
+class ValidationRuleFunctions(ValidationRuleBaseFunctions):
+ """
+ an accumulation object that makes sure, that any validation rule is translated to a function with default naming convention and
+ return types. Each function should return a ValidationRuleState enumeration object and a description string to which validation rule
+ the result belongs. These are returned as tuples (ValidationRuleState, validation_name)
+ Each rule should have the same input arguments (self, shipcall, df_times, *args, **kwargs)
+
+ The object makes heavy use of calls from an SQLHandler object, which provides functions for dataframe access and filtering.
+
+ each validation_name is generated by calling the function inside a method
+ validation_name = inspect.currentframe().f_code.co_name # validation_name then returns the name of the method from where 'currentframe()' was called.
+
+ # example:
+ #def validation_rule_fct_example(self, shipcall, df_times):
+ #validation_name = inspect.currentframe().f_code.co_name
+ #return (ValidationRuleState.NONE, validation_name)
+ """
+ def __init__(self, sql_handler):
+ super().__init__(sql_handler)
+ return
+
+ def get_validation_rule_functions(self):
+ """return a list of all methods in this object, which are all validation rule functions."""
+ return [self.__getattribute__(mthd_) for mthd_ in dir(self) if ('validation_rule_fct' in mthd_) and (isinstance(self.__getattribute__(mthd_), types.MethodType))]
+
+ def validation_rule_fct_missing_time_agency_berth_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-A
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-A:
+ - Checks, if times_agency.eta_berth is filled in.
+ - Measures the difference between 'now' and 'shipcall.eta'.
+ """
+ # check, if the header is filled in (agency)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ query_time = shipcall.eta
+ key_time = times_agency.eta_berth
+ threshold = ParticipantwiseTimeDelta.AGENCY
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_agency_berth_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-B
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-B:
+ - Checks, if times_agency.etd_berth is filled in.
+ - Measures the difference between 'now' and 'shipcall.etd'.
+ """
+ # check, if the header is filled in (agency)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ query_time = shipcall.etd
+ key_time = times_agency.etd_berth
+ threshold = ParticipantwiseTimeDelta.AGENCY
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_mooring_berth_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-C
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-C:
+ - Checks, if times_mooring.eta_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.eta_berth'.
+ """
+ # check, if the header is filled in (agency & MOORING)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.MOORING.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_mooring = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.MOORING.value)
+
+ query_time = times_agency.eta_berth
+ key_time = times_mooring.eta_berth
+ threshold = ParticipantwiseTimeDelta.MOORING
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_mooring_berth_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-D
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-D:
+ - Checks, if times_mooring.etd_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.etd_berth'.
+ """
+ # check, if the header is filled in (agency & MOORING)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.MOORING.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_mooring = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.MOORING.value)
+
+ query_time = times_agency.etd_berth
+ key_time = times_mooring.etd_berth
+ threshold = ParticipantwiseTimeDelta.MOORING
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_portadministration_berth_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-F
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-F:
+ - Checks, if times_port_administration.eta_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.eta_berth'.
+ """
+ # check, if the header is filled in (agency & PORT_ADMINISTRATION)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PORT_ADMINISTRATION.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_port_administration = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PORT_ADMINISTRATION.value)
+
+ query_time = times_agency.eta_berth
+ key_time = times_port_administration.eta_berth
+ threshold = ParticipantwiseTimeDelta.PORT_ADMINISTRATION
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_portadministration_berth_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-G
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-G:
+ - Checks, if times_port_administration.etd_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.etd_berth'.
+ """
+ # check, if the header is filled in (agency & PORT_ADMINISTRATION)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PORT_ADMINISTRATION.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_port_administration = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PORT_ADMINISTRATION.value)
+
+ query_time = times_agency.etd_berth
+ key_time = times_port_administration.etd_berth
+ threshold = ParticipantwiseTimeDelta.PORT_ADMINISTRATION
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_pilot_berth_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-H
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-H:
+ - Checks, if times_pilot.eta_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.eta_berth'.
+ """
+ # check, if the header is filled in (agency & PILOT)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PILOT.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_pilot = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PILOT.value)
+
+ query_time = times_agency.eta_berth
+ key_time = times_pilot.eta_berth
+ threshold = ParticipantwiseTimeDelta.PILOT
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_pilot_berth_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-I
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-I:
+ - Checks, if times_pilot.etd_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.etd_berth'.
+ """
+ # check, if the header is filled in (agency & PILOT)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.PILOT.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_pilot = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.PILOT.value)
+
+ query_time = times_agency.etd_berth
+ key_time = times_pilot.etd_berth
+ threshold = ParticipantwiseTimeDelta.PILOT
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_tug_berth_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-J
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-J:
+ - Checks, if times_tug.eta_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.eta_berth'.
+ """
+ # check, if the header is filled in (agency & TUG)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TUG.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_tug = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TUG.value)
+
+ query_time = times_agency.eta_berth
+ key_time = times_tug.eta_berth
+ threshold = ParticipantwiseTimeDelta.TUG
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_tug_berth_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-K
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-K:
+ - Checks, if times_tug.etd_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.etd_berth'.
+ """
+ # check, if the header is filled in (agency & TUG)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TUG.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_tug = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TUG.value)
+
+ query_time = times_agency.etd_berth
+ key_time = times_tug.etd_berth
+ threshold = ParticipantwiseTimeDelta.TUG
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_terminal_berth_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-L
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-L:
+ - Checks, if times_terminal.eta_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.eta_berth'.
+ """
+ # check, if the header is filled in (agency & terminal)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
+
+ query_time = times_agency.eta_berth
+ key_time = times_terminal.eta_berth
+ threshold = ParticipantwiseTimeDelta.TERMINAL
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_missing_time_terminal_berth_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0001-K
+ Type: Local Rule
+ Description: this validation checks, whether there is a missing time. When the difference between an event (e.g., the shipcall eta) is below
+ a certain threshold (e.g., 20 hours), a violation occurs
+
+ 0001-K:
+ - Checks, if times_terminal.etd_berth is filled in.
+ - Measures the difference between 'now' and 'times_agency.etd_berth'.
+ """
+ # check, if the header is filled in (agency & terminal)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # preparation: obtain the correct times of the participant, define the query time and the key time
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
+
+ query_time = times_agency.etd_berth
+ key_time = times_terminal.etd_berth
+ threshold = ParticipantwiseTimeDelta.TERMINAL
+ violation_state = self.check_time_delta_violation_query_time_to_now(query_time=query_time, key_time=key_time, threshold=threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+
+ def validation_rule_fct_shipcall_incoming_participants_disagree_on_eta(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0002-A
+ Type: Local Rule
+ Description: this validation checks, whether the participants expect different ETA times
+ Filter: only applies to incoming shipcalls
+ """
+ query = "eta_berth"
+
+ violation_state = self.check_participants_agree_on_estimated_time(
+ shipcall = shipcall,
+
+ query=query,
+ df_times=df_times,
+ applicable_shipcall_type=ShipcallType.INCOMING
+ )
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.RED, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_shipcall_outgoing_participants_disagree_on_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0002-B
+ Type: Local Rule
+ Description: this validation checks, whether the participants expect different ETA times
+ Filter: only applies to outgoing shipcalls
+ """
+ query = "etd_berth"
+
+ violation_state = self.check_participants_agree_on_estimated_time(
+ shipcall = shipcall,
+
+ query=query,
+ df_times=df_times,
+ applicable_shipcall_type=ShipcallType.OUTGOING
+ )
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.RED, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_shipcall_shifting_participants_disagree_on_eta_or_etd(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0002-C
+ Type: Local Rule
+ Description: this validation checks, whether the participants expect different ETA or ETD times
+ Filter: only applies to shifting shipcalls
+ """
+ violation_state_eta = self.check_participants_agree_on_estimated_time(
+ shipcall = shipcall,
+
+ query="eta_berth",
+ df_times=df_times,
+ applicable_shipcall_type=ShipcallType.SHIFTING
+ )
+
+ violation_state_etd = self.check_participants_agree_on_estimated_time(
+ shipcall = shipcall,
+
+ query="etd_berth",
+ df_times=df_times,
+ applicable_shipcall_type=ShipcallType.SHIFTING
+ )
+
+ # apply 'eta_berth' check
+ # apply 'etd_berth'
+ # violation: if either 'eta_berth' or 'etd_berth' is violated
+ # functionally, this is the same as individually comparing all times for the participants
+ # times_agency.eta_berth==times_mooring.eta_berth==times_portadministration.eta_berth==times_pilot.eta_berth==times_tug.eta_berth
+ # times_agency.etd_berth==times_mooring.etd_berth==times_portadministration.etd_berth==times_pilot.etd_berth==times_tug.etd_berth
+ violation_state = (violation_state_eta) or (violation_state_etd)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.RED, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_eta_time_not_in_operation_window(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0003-A
+ Type: Local Rule
+ Description: this validation checks, whether the ETA time is between the provided operations window of the terminal
+
+ query time: eta_berth (times_agency)
+ start_time & end_time: operations_start & operations_end (times_terminal)
+ """
+ # check, if the header is filled in (agency & terminal)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ # get agency & terminal times
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY)
+ times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL)
+
+ if (times_terminal.operations_end is pd.NaT) or (times_agency.etd_berth is pd.NaT):
+ return (StatusFlags.GREEN, None)
+
+ # check, whether the start of operations is AFTER the estimated arrival time
+ violation_state = times_terminal.operations_start times_agency.etd_berth
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.RED, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_eta_time_not_in_tidal_window(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0004-A
+ Type: Local Rule
+ Description: this validation checks, whether the ETA time is between the provided tidal window
+
+ query time: eta_berth (times_agency)
+ start_time & end_time: tidal_window_from & tidal_window_to (shipcall)
+ """
+ # check, if the header is filled in (agency)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
+ return (StatusFlags.GREEN, None)
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY)
+
+ # requirements: tidal window (from & to) is filled in
+ if (shipcall.tidal_window_from is pd.NaT) or (shipcall.tidal_window_to is pd.NaT) or (df_times.eta_berth is pd.NaT):
+ return (StatusFlags.GREEN, None)
+
+ # check, whether the query time is between start & end time
+ # a violation is observed, when the is NOT between start & end
+ violation_state = not self.time_logic.time_inbetween(query_time=times_agency.eta_berth, start_time=shipcall.tidal_window_from, end_time=shipcall.tidal_window_to)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.RED, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_etd_time_not_in_tidal_window(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0004-B
+ Type: Local Rule
+ Description: this validation checks, whether the ETD time is between the provided tidal window
+
+ query time: eta_berth (times_agency)
+ start_time & end_time: tidal_window_from & tidal_window_to (shipcall)
+ """
+ # check, if the header is filled in (agency)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value])]) != 1:
+ return (StatusFlags.GREEN, None)
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY)
+
+ # requirements: tidal window (from & to) is filled in
+ if (shipcall.tidal_window_from is pd.NaT) or (shipcall.tidal_window_to is pd.NaT) or (df_times.eta_berth is pd.NaT):
+ return (StatusFlags.GREEN, None)
+
+ # check, whether the query time is between start & end time
+ # a violation is observed, when the is NOT between start & end
+ violation_state = not self.time_logic.time_inbetween(query_time=times_agency.etd_berth, start_time=shipcall.tidal_window_from, end_time=shipcall.tidal_window_to)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.RED, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_too_many_identical_eta_times(self, shipcall, df_times, rounding = "min", maximum_threshold = 3, *args, **kwargs):
+ """
+ Code: #0005-A
+ Type: Global Rule
+ Description: this validation rule checks, whether there are too many shipcalls with identical ETA times.
+ """
+ # when ANY of the unique values exceeds the threshold, a violation is observed
+ query = "eta_berth"
+ violation_state = self.check_unique_shipcall_counts(query, rounding=rounding, maximum_threshold=maximum_threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_too_many_identical_etd_times(self, shipcall, df_times, rounding = "min", maximum_threshold = 3, *args, **kwargs):
+ """
+ Code: #0005-B
+ Type: Global Rule
+ Description: this validation rule checks, whether there are too many shipcalls with identical ETD times.
+ """
+ # when ANY of the unique values exceeds the threshold, a violation is observed
+ query = "etd_berth"
+ violation_state = self.check_unique_shipcall_counts(query, rounding=rounding, maximum_threshold=maximum_threshold)
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_agency_and_terminal_berth_id_disagreement(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0006-A
+ Type: Local Rule
+ Description: This validation rule checks, whether agency and terminal agree with their designated berth place by checking berth_id.
+ """
+ # check, if the header is filled in (agency & terminal)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
+
+ violation_state = times_agency.berth_id!=times_terminal.berth_id
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+ def validation_rule_fct_agency_and_terminal_pier_side_disagreement(self, shipcall, df_times, *args, **kwargs):
+ """
+ Code: #0006-B
+ Type: Local Rule
+ Description: This validation rule checks, whether agency and terminal agree with their designated pier side by checking pier_side.
+ """
+ # check, if the header is filled in (agency & terminal)
+ if len(df_times.loc[df_times["participant_type"].isin([ParticipantType.AGENCY.value, ParticipantType.TERMINAL.value])]) != 2:
+ return (StatusFlags.GREEN, None)
+
+ times_agency = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.AGENCY.value)
+ times_terminal = self.sql_handler.get_times_for_participant_type(df_times, participant_type=ParticipantType.TERMINAL.value)
+
+ violation_state = times_agency.pier_side!=times_terminal.pier_side
+
+ if violation_state:
+ validation_name = inspect.currentframe().f_code.co_name
+ return (StatusFlags.YELLOW, validation_name)
+ else:
+ return (StatusFlags.GREEN, None)
+
+
diff --git a/src/lib_brecal_utils/brecal_utils/validators/validation_rules.py b/src/lib_brecal_utils/brecal_utils/validators/validation_rules.py
new file mode 100644
index 0000000..3ee7a46
--- /dev/null
+++ b/src/lib_brecal_utils/brecal_utils/validators/validation_rules.py
@@ -0,0 +1,127 @@
+import copy
+import numpy as np
+import pandas as pd
+from brecal_utils.database.enums import StatusFlags
+from brecal_utils.validators.validation_rule_functions import ValidationRuleFunctions
+from BreCal.schemas.model import Shipcall
+
+
+class ValidationRules(ValidationRuleFunctions):
+ """
+ An object that determines the traffic light state for validation and notification. The provided feedback ('green', 'yellow', 'red')
+ determines, whether the state is critical. It uses ValidationRuleState enumerations.
+ In case of a critical validation state, the user's input prompt may be interrupted and the user may be warned.
+ In case of a critical notification state, the respective users will be automatically notified after n seconds. (#TODO_n_seconds_delay)
+ """
+ def __init__(self, sql_handler): # use the entire data that is provided for this query (e.g., json input)
+ super().__init__(sql_handler)
+
+ self.validation_state = self.determine_validation_state()
+ # currently flagged: notification_state initially was based on using one ValidationRules object for each query. This is deprecated.
+ # self.notification_state = self.determine_notification_state() # (state:str, should_notify:bool)
+ return
+
+ def evaluate(self, shipcall):
+ """
+ 1.) prepare df_times, which every validation rule tends to use
+ calling this only once saves a lot of computational overhead
+ 2.) apply all validation rules
+ returns: (evaluation_state, violations)
+ """
+ # prepare df_times, which every validation rule tends to use
+ df_times = self.sql_handler.df_dict.get('times') # -> pd.DataFrame
+
+ # filter by shipcall id
+ df_times = self.sql_handler.get_times_of_shipcall(shipcall)
+
+ # apply all validation rules
+ # list of tuples, where each element is (state, msg)
+ evaluation_results = [elem(shipcall, df_times) for elem in self.get_validation_rule_functions()]
+
+ # filter out all 'None' results, which indicate that no violation occured.
+ evaluation_results = [evaluation_result for evaluation_result in evaluation_results if evaluation_result[1] is not None]
+
+ """ # deprecated
+ # check, if ANY of the evaluation results (evaluation_state) is larger than the .GREEN state. This means, that .YELLOW and .RED
+ # would return 'True'. Numpy arrays and functions are used to accelerate the comparison.
+ # np.any returns a boolean.
+ #evaluation_state = not np.any(np.greater(np.array([result[0] for result in evaluation_results]), ValidationRuleState.GREEN))
+ """
+ # check, what the maximum state flag is and return it
+ evaluation_state = np.max(np.array([result[0] for result in evaluation_results])) if len(evaluation_results)>0 else 1
+ return (evaluation_state, evaluation_results)
+
+ def evaluation_verbosity(self, evaluation_state, evaluation_results):
+ """This function suggestions verbosity for the evaluation results. Based on 'True'/'False' evaluation outcome, the returned string is different."""
+ if evaluation_state:
+ return f"OK! The validation was successful. There are no rule violations."
+ else:
+ verbose_string = "These are:" + "\n\t".join(evaluation_results) # every element of the list will be displayed in a new line with a tab
+ return f"FAILED VALIDATION. There have been {len(evaluation_results)} violations. {verbose_string}"
+
+ def evaluate_shipcall_from_df(self, x):
+ shipcall = Shipcall(**{**{'id':x.name}, **x.to_dict()})
+ evaluation_state, violations = self.evaluate(shipcall)
+ return evaluation_state, violations
+
+ def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame:
+ """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation' and 'evaluation_message' are updated)"""
+ results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values
+
+ # unbundle individual results. evaluation_state becomes an integer, violation
+ evaluation_state = [StatusFlags(res[0]).value for res in results]
+ violations = [",".join(res[1]) if len(res[1])>0 else None for res in results]
+
+ shipcall_df.loc[:,"evaluation"] = evaluation_state
+ shipcall_df.loc[:,"evaluation_message"] = violations
+ return shipcall_df
+
+ def determine_validation_state(self) -> str:
+ """
+ this method determines the validation state of a shipcall. The state is either ['green', 'yellow', 'red'] and signals,
+ whether an entry causes issues within the workflow of users.
+
+ returns: validation_state_new (str)
+ """
+ (validation_state_new, description) = self.undefined_method()
+ # should there also be notifications for critical validation states? In principle, the traffic light itself provides that notification.
+ self.validation_state = validation_state_new
+ return validation_state_new
+
+ def determine_notification_state(self) -> (str, bool):
+ """
+ this method determines state changes in the notification state. When the state is changed to yellow or red,
+ a user is notified about it. The only exception for this rule is when the state was yellow or red before,
+ as the user has then already been notified.
+
+ returns: notification_state_new (str), should_notify (bool)
+ """
+ (state_new, description) = self.undefined_method() # determine the successor
+ should_notify = self.identify_notification_state_change(state_new)
+ self.notification_state = state_new # overwrite the predecessor
+ return state_new, should_notify
+
+ def identify_notification_state_change(self, state_new) -> bool:
+ """
+ determines, whether the observed state change should trigger a notification.
+ internally, this function maps a color string to an integer and determines, if the successor state is more severe than the predecessor.
+
+ state changes trigger a notification in the following cases:
+ green -> yellow
+ green -> red
+ yellow -> red
+
+ (none -> yellow) or (none -> red)
+ due to the values in the enumeration objects, the states are mapped to provide this function.
+ green=1, yellow=2, red=3, none=1. Hence, critical changes can be observed by simply checking with "greater than".
+
+ returns bool, whether a notification should be triggered
+ """
+ # state_old is always considered at least 'Green' (1)
+ state_old = max(copy.copy(self.notification_state) if "notification_state" in list(self.__dict__.keys()) else StatusFlags.NONE, StatusFlags.GREEN.value)
+ return state_new.value > state_old.value
+
+ def undefined_method(self) -> str:
+ """this function should apply the ValidationRules to the respective .shipcall, in regards to .times"""
+ # #TODO_traffic_state
+ return (StatusFlags.GREEN, False) # (state:str, should_notify:bool)
diff --git a/src/lib_brecal_utils/setup.py b/src/lib_brecal_utils/setup.py
new file mode 100644
index 0000000..44b0590
--- /dev/null
+++ b/src/lib_brecal_utils/setup.py
@@ -0,0 +1,13 @@
+from setuptools import find_packages, setup
+
+package_name = "brecal_utils"
+
+exec(open(f'{package_name}/_version.py').read()) # obtains __version__
+setup(
+ name=package_name,
+ packages=find_packages(),
+ version=__version__,
+ description='initializing the library. testing initial imports and relations. the license type is to be determined',
+ author='Max Metz',
+ license='to be determined',
+)
diff --git a/src/lib_brecal_utils/tests/__init__.py b/src/lib_brecal_utils/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/tests/stubs/__init__.py b/src/lib_brecal_utils/tests/stubs/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/tests/stubs/test_stub_objects.py b/src/lib_brecal_utils/tests/stubs/test_stub_objects.py
new file mode 100644
index 0000000..ccd1655
--- /dev/null
+++ b/src/lib_brecal_utils/tests/stubs/test_stub_objects.py
@@ -0,0 +1,60 @@
+import pytest
+
+def test_build_stub_berth():
+ from BreCal.schemas.model import Berth
+ from brecal_utils.stubs.berth import get_berth_simple
+ berth = get_berth_simple()
+ assert isinstance(berth, Berth)
+ return
+
+def test_build_stub_participant():
+ from BreCal.schemas.model import Participant
+ from brecal_utils.stubs.participant import get_participant_simple
+ participant = get_participant_simple()
+ assert isinstance(participant, Participant)
+ return
+
+def test_build_stub_user():
+ from BreCal.schemas.model import User
+ from brecal_utils.stubs.user import get_user_simple
+ user = get_user_simple()
+ assert isinstance(user, User)
+ return
+
+def test_build_stub_ship():
+ from BreCal.schemas.model import Ship
+ from brecal_utils.stubs.ship import get_ship_simple
+ ship = get_ship_simple()
+ assert isinstance(ship, Ship)
+ return
+
+def test_build_stub_shipcall():
+ from BreCal.schemas.model import Shipcall
+ from brecal_utils.stubs.shipcall import get_shipcall_simple
+ shipcall = get_shipcall_simple()
+ assert isinstance(shipcall, Shipcall)
+ return
+
+def test_build_stub_times():
+ from BreCal.schemas.model import Times
+ from brecal_utils.stubs.times_full import get_times_full_simple
+ times = get_times_full_simple()
+ assert isinstance(times, Times)
+ return
+
+def test_build_stub_notification():
+ from BreCal.schemas.model import Notification
+ from brecal_utils.stubs.notification import get_notification_simple
+ notification = get_notification_simple()
+ assert isinstance(notification, Notification)
+
+if __name__=="__main__":
+ test_build_stub_berth()
+ test_build_stub_participant()
+ test_build_stub_berth()
+ test_build_stub_user()
+ test_build_stub_ship()
+ test_build_stub_tug()
+ test_build_stub_shipcall()
+ test_build_stub_times()
+ test_build_stub_notification()
diff --git a/src/lib_brecal_utils/tests/test_import_modules.py b/src/lib_brecal_utils/tests/test_import_modules.py
new file mode 100644
index 0000000..cdbb942
--- /dev/null
+++ b/src/lib_brecal_utils/tests/test_import_modules.py
@@ -0,0 +1,102 @@
+import pytest
+
+def test_import_colorama():
+ """
+ colorama is used for 'pretty print' options, such as colored printing. For example, this is used in pytest-cov to quickly
+ highlight passing and failing tests
+ """
+ import colorama
+ return
+
+def test_import_matplotlib():
+ """matplotlib is used for visualizations (e.g. images and graphs)"""
+ import matplotlib
+ return
+
+def test_import_matplotlib_pyplot():
+ """pyplot as a sub-library of matplotlib, which is used to plot images and graphs"""
+ import matplotlib.pyplot as plt
+ return
+
+def test_import_tqdm_tqdm():
+ """tqdm is a neat utility library for simple display of progress in loops"""
+ from tqdm import tqdm
+ return
+
+def test_import_pandas():
+ """pandas is useful to handle dataframes and read from .csv or .json files, which can be collected into joint DataFrame objects"""
+ import pandas as pd
+ return
+
+def test_import_flask():
+ """flask is a WSGI framework for quick and easy design of web-based applications"""
+ import flask
+ from flask import Flask, Blueprint, request
+ return
+
+def test_import_flask_specific_objects():
+ """common flask objects, such as the Flask api object, the Blueprint and requests"""
+ from flask import Flask, Blueprint, request
+ return
+
+
+
+def test_import_mysql_connector():
+ """the 'mysql.connector' Object is used for the BreCal server database"""
+ import mysql.connector
+ return
+
+def test_import_pydapper():
+ """is a library that provides convenient methods for database related work"""
+ import pydapper
+ return
+
+def test_import_webargs():
+ """currently used in ~/brecal/src/server/BreCal/api/berths.py"""
+ import webargs
+ from webargs.flaskparser import parser
+ return
+
+def test_import_mashmallow():
+ """currently used in ~/brecal/src/server/BreCal/api/shipcalls.py"""
+ import marshmallow
+ from marshmallow import Schema, fields
+ return
+
+def test_import_flask_jwt_extended():
+ """currently used in ~/brecal/src/server/BreCal/api/login.py"""
+ import flask_jwt_extended
+ from flask_jwt_extended import create_access_token
+ return
+
+def test_import_pyjwt():
+ """currently used in ~/brecal/src/server/BreCal/services/jwt_handler.py"""
+ import jwt
+ return
+
+def test_import_bcrypt():
+ """currently used in ~/brecal/src/server/BreCal/impl/login.py"""
+ import bcrypt
+ return
+
+def test_import_math():
+ """math.isclose can be interesting to measure differences between two times (e.g., to ignore milliseconds)"""
+ import math
+ math.isclose
+ return
+
+def test_import_datetime():
+ """datetime is the default library for times"""
+ import datetime
+ datetime.datetime.now()
+ return
+
+
+if __name__=="__main__":
+ test_import_colorama()
+ test_import_matplotlib()
+ test_import_matplotlib_pyplot()
+ test_import_tqdm_tqdm()
+ test_import_pandas()
+ test_import_flask()
+
diff --git a/src/lib_brecal_utils/tests/test_test_handling.py b/src/lib_brecal_utils/tests/test_test_handling.py
new file mode 100644
index 0000000..877766e
--- /dev/null
+++ b/src/lib_brecal_utils/tests/test_test_handling.py
@@ -0,0 +1,48 @@
+import unittest
+import pytest
+
+def test_execute_coverage_test():
+ """
+ executes {execute_coverage_test} to check, whether reporting works as expected
+ """
+ import os
+ import brecal_utils
+
+ from brecal_utils.file_handling import get_project_root
+ from brecal_utils.test_handling import execute_coverage_test
+
+ root_dir = brecal_utils.__file__
+ root_dir = get_project_root("lib_brecal_utils", root_dir=root_dir)
+
+ tests_path = os.path.join(root_dir, "tests")
+ coverage_path = os.path.join(root_dir, "brecal_utils")
+ report_path = os.path.join(root_dir, "coverage_reports")
+
+ with pytest.raises(KeyboardInterrupt, match="is_test_interrupt"):
+ execute_coverage_test(tests_path=tests_path, coverage_path=coverage_path, cov_report_dst_dir=report_path, cov_fail_under_rate=0, is_test=1)
+ return
+
+def test_execute_coverage_test_no_report():
+ """
+ executes {execute_coverage_test} to check, whether the function also works without reporting
+ """
+ import os
+ import brecal_utils
+
+ from brecal_utils.file_handling import get_project_root
+ from brecal_utils.test_handling import execute_coverage_test
+
+ root_dir = brecal_utils.__file__
+ root_dir = get_project_root("lib_brecal_utils", root_dir=root_dir)
+
+ tests_path = os.path.join(root_dir, "tests")
+ coverage_path = os.path.join(root_dir, "brecal_utils")
+ report_path = os.path.join(root_dir, "coverage_reports")
+
+ with pytest.raises(KeyboardInterrupt, match="is_test_interrupt"):
+ execute_coverage_test(tests_path=tests_path, coverage_path=coverage_path, cov_report_dst_dir=None, cov_fail_under_rate=0, is_test=1)
+ return
+
+
+if __name__=="__main__":
+ pass
diff --git a/src/lib_brecal_utils/tests/test_time_handling.py b/src/lib_brecal_utils/tests/test_time_handling.py
new file mode 100644
index 0000000..70135d5
--- /dev/null
+++ b/src/lib_brecal_utils/tests/test_time_handling.py
@@ -0,0 +1,50 @@
+import pytest
+
+def test_difference_to_then_tgt_time_none():
+ import math
+ import datetime
+ from brecal_utils import difference_to_then
+
+ difference_in_seconds = 42
+ event_time = datetime.datetime.now() - datetime.timedelta(seconds=difference_in_seconds)
+ event_time_diff = difference_to_then(event_time) # tgt_time = datetime.datetime.now()
+
+ # {difference_to_then} internally creates a .now() time, when the {then_time} is not defined
+ # hence, the difference will never be exactly 42 seconds due to slight latency
+ # math.isclose allows deviations up to 0.05 seconds
+ assert math.isclose(42, event_time_diff, abs_tol=0.05), f"both times are reasonably close"
+ return
+
+def test_difference_to_then_tgt_time_not_none():
+ import math
+ import datetime
+ from brecal_utils import difference_to_then
+
+ difference_in_seconds = 42
+ event_time = datetime.datetime(2000, 1, 1, 0, 0, 0)
+ tgt_time = event_time - datetime.timedelta(seconds=difference_in_seconds)
+ event_time_diff = difference_to_then(event_time, tgt_time)
+
+ # tgt time is -42 seconds, as it is 42 seconds before event_time
+ assert event_time_diff==-42, f"event time difference is incorrect"
+ return
+
+def test_difference_to_then_tgt_time_not_none_make_absolute():
+ import math
+ import datetime
+ from brecal_utils import difference_to_then
+
+ difference_in_seconds = 42
+ event_time = datetime.datetime(2000, 1, 1, 0, 0, 0)
+ tgt_time = event_time - datetime.timedelta(seconds=difference_in_seconds)
+ event_time_diff = difference_to_then(event_time, tgt_time, make_absolute=True) # difference: -42. make_absolute: +42
+
+ # tgt time is -42 seconds, as it is 42 seconds before event_time. However, we are interested in an absolute value
+ assert event_time_diff==42, f"event time difference is incorrect"
+ return
+
+
+if __name__=="__main__":
+ test_difference_to_then()
+
+
diff --git a/src/lib_brecal_utils/tests/validators/__init__.py b/src/lib_brecal_utils/tests/validators/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/lib_brecal_utils/tests/validators/test_input_validation.py b/src/lib_brecal_utils/tests/validators/test_input_validation.py
new file mode 100644
index 0000000..a962470
--- /dev/null
+++ b/src/lib_brecal_utils/tests/validators/test_input_validation.py
@@ -0,0 +1,63 @@
+import pytest
+
+@pytest.fixture()
+def build_input_validation():
+ from brecal_utils.validators.input_validation import InputValidation
+ iv = InputValidation()
+ return locals()
+
+
+def test_build_input_validation():
+ from brecal_utils.validators.input_validation import InputValidation
+ iv = InputValidation()
+ return
+
+def test_all_models_are_supported(build_input_validation):
+ iv = build_input_validation["iv"]
+
+ from brecal_utils.stubs.ship import get_ship_simple
+ ship = get_ship_simple()
+ iv.assert_if_not_supported(ship)
+
+ from brecal_utils.stubs.shipcall import get_shipcall_simple
+ shipcall = get_shipcall_simple()
+ iv.assert_if_not_supported(shipcall)
+
+ from brecal_utils.stubs.berth import get_berth_simple
+ berth = get_berth_simple()
+ iv.assert_if_not_supported(berth)
+
+ from brecal_utils.stubs.participant import get_participant_simple
+ participant = get_participant_simple()
+ iv.assert_if_not_supported(participant)
+
+ from brecal_utils.stubs.user import get_user_simple
+ user = get_user_simple()
+ iv.assert_if_not_supported(user)
+
+ # placeholder: how to handle times?
+ return
+
+def test_ship_input_validation(build_input_validation):
+ iv = build_input_validation["iv"]
+
+ from brecal_utils.stubs.ship import get_ship_simple
+ ship = get_ship_simple()
+ violations, state = iv.verify(ship)
+ assert state, f"found violations: {violations}"
+ return
+
+def test_participant_input_validation(build_input_validation):
+ iv = build_input_validation["iv"]
+
+ from brecal_utils.stubs.participant import get_participant_simple
+ participant = get_participant_simple()
+ violations, state = iv.verify(participant)
+ assert state, f"found violations: {violations}"
+ return
+
+
+
+if __name__=="__main__":
+ pass
+
diff --git a/src/lib_brecal_utils/tests/validators/test_schema_validation_berth.py b/src/lib_brecal_utils/tests/validators/test_schema_validation_berth.py
new file mode 100644
index 0000000..900cfc6
--- /dev/null
+++ b/src/lib_brecal_utils/tests/validators/test_schema_validation_berth.py
@@ -0,0 +1,14 @@
+import pytest
+from brecal_utils.stubs.berth import get_berth_simple
+
+def test_berth():
+ with pytest.raises(ValueError, match="#TODO: copied from ships."):
+ berth = get_berth_simple()
+
+ raise ValueError("#TODO: copied from ships.")
+ from brecal_utils.validators.schema_validation import test____
+ ship = get_ship_simple()
+ ship.length = 234
+ assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters"
+ return
+
diff --git a/src/lib_brecal_utils/tests/validators/test_schema_validation_participant.py b/src/lib_brecal_utils/tests/validators/test_schema_validation_participant.py
new file mode 100644
index 0000000..9c51f60
--- /dev/null
+++ b/src/lib_brecal_utils/tests/validators/test_schema_validation_participant.py
@@ -0,0 +1,28 @@
+import pytest
+from brecal_utils.stubs.participant import get_participant_simple
+
+def test_participant_postal_code_len_is_five():
+ from brecal_utils.validators.schema_validation import participant_postal_code_len_is_five
+
+ participant = get_participant_simple()
+ assert participant_postal_code_len_is_five(participant)[0], f"the postal code should be exactly 5 numbers"
+ return
+
+def test_participant_postal_code_len_is_six_should_assert():
+ from brecal_utils.validators.schema_validation import participant_postal_code_len_is_five
+
+ participant = get_participant_simple()
+ participant.postal_code = "123456"
+ with pytest.raises(AssertionError, match="the postal code should be exactly 5 numbers"):
+ assert participant_postal_code_len_is_five(participant)[0], f"the postal code should be exactly 5 numbers"
+ return
+
+# TODO_postal_code_zero -> assert? Is postal_code mandatory?
+
+
+
+
+if __name__=="__main__":
+ test_participant_postal_code_len_is_five()
+ test_participant_postal_code_len_is_six_should_assert()
+
diff --git a/src/lib_brecal_utils/tests/validators/test_schema_validation_ship.py b/src/lib_brecal_utils/tests/validators/test_schema_validation_ship.py
new file mode 100644
index 0000000..71ddd19
--- /dev/null
+++ b/src/lib_brecal_utils/tests/validators/test_schema_validation_ship.py
@@ -0,0 +1,270 @@
+import pytest
+from brecal_utils.stubs.ship import get_ship_simple
+
+def test_ship_length_valid_range_234_is_valid():
+ from brecal_utils.validators.schema_validation import ship_length_in_range
+ ship = get_ship_simple()
+ ship.length = 234
+ assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters"
+ return
+
+def test_ship_length_maximum_not_valid_range():
+ from brecal_utils.validators.schema_validation import ship_length_in_range
+ ship = get_ship_simple()
+ ship.length = 500
+ with pytest.raises(AssertionError):
+ assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters, but is 500"
+ return
+
+def test_ship_length_minimum_not_valid_range():
+ from brecal_utils.validators.schema_validation import ship_length_in_range
+ ship = get_ship_simple()
+ ship.length = 0
+ with pytest.raises(AssertionError):
+ assert ship_length_in_range(ship)[0], f"ship length must be between 0 and 500 meters, but is 0"
+ return
+
+def test_ship_width_valid_range_137_is_valid():
+ from brecal_utils.validators.schema_validation import ship_width_in_range
+ ship = get_ship_simple()
+ ship.width = 137
+ assert ship_width_in_range(ship)[0], f"ship width must be between 0 and 500 meters"
+ return
+
+def test_ship_width_maximum_not_valid_range():
+ from brecal_utils.validators.schema_validation import ship_width_in_range
+ ship = get_ship_simple()
+ ship.width = 500
+ with pytest.raises(AssertionError):
+ assert ship_width_in_range(ship)[0], f"ship width must be between 0 and 500 meters, but is 500"
+ return
+
+def test_ship_width_minimum_not_valid_range():
+ from brecal_utils.validators.schema_validation import ship_width_in_range
+ ship = get_ship_simple()
+ ship.width = 0
+ with pytest.raises(AssertionError):
+ assert ship_width_in_range(ship)[0], f"ship width must be between 0 and 500 meters, but is 0"
+ return
+
+# not tug: values can be None and raise no error
+def test_ship_bollard_pull_is_none_and_not_tug():
+ from brecal_utils.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
+ ship = get_ship_simple()
+ ship.is_tug = False
+ ship.bollard_pull = None
+ assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull should either be undefined or between 0 and 500 meters"
+ return
+
+def test_ship_participant_id_is_none_and_not_tug():
+ from brecal_utils.validators.schema_validation import ship_participant_id_is_none_or_int
+ ship = get_ship_simple()
+ ship.is_tug = False
+ ship.participant_id = None
+ assert ship_participant_id_is_none_or_int(ship)[0], f"the participant_id should either be undefined or an integer id"
+ return
+
+def test_ship_max_draft_is_none_and_not_tug():
+ from brecal_utils.validators.schema_validation import ship_max_draft_is_none_or_in_range
+ ship = get_ship_simple()
+ ship.is_tug = False
+ ship.max_draft = None
+ assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft should either be undefined or between 0 and 20 meters"
+ return
+
+
+# tug: values must be set, and are set. all tests should be accepted without assertion
+def test_ship_is_tug_bollard_pull_is_not_none():
+ from brecal_utils.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.bollard_pull = 311
+ assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull should either be undefined or between 0 and 500 meters"
+ return
+
+
+def test_ship_is_tug_max_draft_is_not_none():
+ from brecal_utils.validators.schema_validation import ship_max_draft_is_none_or_in_range
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.max_draft = 17
+ assert ship_max_draft_is_none_or_in_range(ship)[0], f"the max_draft should either be undefined or between 0 and 20 meters"
+ return
+
+def test_ship_is_tug_participant_id_is_not_none():
+ from brecal_utils.validators.schema_validation import ship_participant_id_is_none_or_int
+ from brecal_utils.stubs import generate_uuid1_int
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.participant_id = generate_uuid1_int()
+ assert ship_participant_id_is_none_or_int(ship)[0], f"the participant_id should either be undefined or an integer id"
+ return
+
+def test_ship_is_tug_participant_id_is_str_and_fails():
+ from brecal_utils.validators.schema_validation import ship_participant_id_is_none_or_int
+ # note: this is an artificial test case. However, it ensures that operators using the backend cannot create an id incorrectly
+ from brecal_utils.stubs import generate_uuid1_int
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.participant_id = str(generate_uuid1_int())
+ with pytest.raises(AssertionError):
+ assert ship_participant_id_is_none_or_int(ship)[0], f"the participant_id should either be None or int, but is str"
+ return
+
+
+# tug: values must be set, but are not. all tests should raise AssertionError
+def test_ship_is_tug_bollard_pull_but_is_none_fails():
+ from brecal_utils.validators.schema_validation import ship_bollard_pull_is_defined_or_is_not_tug
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.bollard_pull = None
+ with pytest.raises(AssertionError):
+ assert ship_bollard_pull_is_defined_or_is_not_tug(ship)[0], f"the bollard_pull cannot be None, if the ship is a tug"
+ return
+
+def test_ship_is_tug_max_draft_but_is_none_fails():
+ from brecal_utils.validators.schema_validation import ship_max_draft_is_defined_or_is_not_tug
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.max_draft = None
+ with pytest.raises(AssertionError):
+ assert ship_max_draft_is_defined_or_is_not_tug(ship)[0], f"the max_draft cannot be None, if the ship is a tug"
+ return
+
+def test_ship_is_tug_participant_id_but_is_none_fails():
+ from brecal_utils.validators.schema_validation import ship_participant_id_is_defined_or_is_not_tug
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.participant_id = None
+ with pytest.raises(AssertionError):
+ assert ship_participant_id_is_defined_or_is_not_tug(ship)[0], f"the participant_id cannot be None, if the ship is a tug"
+ return
+
+
+# tug: values must be in valid range
+# # sequence: 1.) is valid, 2.) is too small, 3.) is too large
+def test_ship_is_tug_bollard_pull_in_range_311_valid():
+ from brecal_utils.validators.schema_validation import ship_bollard_pull_is_none_or_in_range
+ ship = get_ship_simple()
+ ship.is_tug = True
+ ship.bollard_pull = 311
+ assert ship_bollard_pull_is_none_or_in_range(ship)[0], f"the bollard_pull must be 00, f"must return at least one method!"
+ return
+
+
+
+
+def test_validation_rule_fct_agency_and_terminal_pier_side_disagreement(build_sql_proxy_connection):
+ """#0006-A validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
+ import pandas as pd
+
+ from brecal_utils.stubs.times_full import get_times_full_simple
+ from brecal_utils.stubs.shipcall import get_shipcall_simple
+ from brecal_utils.database.enums import ParticipantType
+ from brecal_utils.database.enums import StatusFlags
+
+ vr = build_sql_proxy_connection["vr"]
+ shipcall = get_shipcall_simple()
+ t1 = get_times_full_simple()
+ t2 = get_times_full_simple()
+
+ # roles: agency & terminal
+ t1.participant_type = ParticipantType.AGENCY.value
+ t2.participant_type = ParticipantType.TERMINAL.value
+
+ # disagreement
+ t1.pier_side = True
+ t2.pier_side = False
+
+ time_objects = [t1, t2]
+ df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
+ df_times.set_index('id',inplace=True)
+
+ (state, description) = vr.validation_rule_fct_agency_and_terminal_pier_side_disagreement(shipcall, df_times)
+ assert state.value > StatusFlags.GREEN.value, f"a violation must be identified"
+ assert description is not None, f"a violation description must be identified"
+ return
+
+
+def test_validation_rule_fct_agency_and_terminal_pier_side_agreement(build_sql_proxy_connection):
+ """#0006-A validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
+ import pandas as pd
+
+ from brecal_utils.stubs.times_full import get_times_full_simple
+ from brecal_utils.stubs.shipcall import get_shipcall_simple
+ from brecal_utils.database.enums import ParticipantType
+ from brecal_utils.database.enums import StatusFlags
+
+ vr = build_sql_proxy_connection["vr"]
+ shipcall = get_shipcall_simple()
+ t1 = get_times_full_simple()
+ t2 = get_times_full_simple()
+
+ # roles: agency & terminal
+ t1.participant_type = ParticipantType.AGENCY.value
+ t2.participant_type = ParticipantType.TERMINAL.value
+
+ # agreement
+ t1.pier_side = True
+ t2.pier_side = True
+
+ time_objects = [t1, t2]
+ df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
+ df_times.set_index('id',inplace=True)
+
+ (state, description) = vr.validation_rule_fct_agency_and_terminal_pier_side_disagreement(shipcall, df_times)
+ assert state.value == StatusFlags.GREEN.value, f"no violation should be observed"
+ assert description is None, f"no violation should be observed"
+ return
+
+
+
+
+def test_validation_rule_fct_agency_and_terminal_berth_id_disagreement(build_sql_proxy_connection):
+ """#0006-B validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
+ import pandas as pd
+
+ from brecal_utils.stubs.times_full import get_times_full_simple
+ from brecal_utils.stubs.shipcall import get_shipcall_simple
+ from brecal_utils.database.enums import ParticipantType
+ from brecal_utils.database.enums import StatusFlags
+
+ vr = build_sql_proxy_connection["vr"]
+ shipcall = get_shipcall_simple()
+ t1 = get_times_full_simple()
+ t2 = get_times_full_simple()
+
+ # roles: agency & terminal
+ t1.participant_type = ParticipantType.AGENCY.value
+ t2.participant_type = ParticipantType.TERMINAL.value
+
+ # disagreement
+ t1.berth_id = 1
+ t2.berth_id = 2
+
+ time_objects = [t1, t2]
+ df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
+ df_times.set_index('id',inplace=True)
+
+ (state, description) = vr.validation_rule_fct_agency_and_terminal_berth_id_disagreement(shipcall, df_times)
+ assert state.value > StatusFlags.GREEN.value, f"a violation must be identified"
+ assert description is not None, f"a violation description must be identified"
+ return
+
+def test_validation_rule_fct_agency_and_terminal_berth_id_agreement(build_sql_proxy_connection):
+ """#0006-B validation_rule_fct_agency_and_terminal_pier_side_disagreement"""
+ import pandas as pd
+
+ from brecal_utils.stubs.times_full import get_times_full_simple
+ from brecal_utils.stubs.shipcall import get_shipcall_simple
+ from brecal_utils.database.enums import ParticipantType
+ from brecal_utils.database.enums import StatusFlags
+
+ vr = build_sql_proxy_connection["vr"]
+ shipcall = get_shipcall_simple()
+ t1 = get_times_full_simple()
+ t2 = get_times_full_simple()
+
+ # roles: agency & terminal
+ t1.participant_type = ParticipantType.AGENCY.value
+ t2.participant_type = ParticipantType.TERMINAL.value
+
+ # agreement
+ t1.berth_id = 21
+ t2.berth_id = 21
+
+ time_objects = [t1, t2]
+ df_times = pd.DataFrame.from_records([to_.__dict__ for to_ in time_objects])
+ df_times.set_index('id',inplace=True)
+
+ (state, description) = vr.validation_rule_fct_agency_and_terminal_berth_id_disagreement(shipcall, df_times)
+ assert state.value == StatusFlags.GREEN.value, f"no violation should be observed"
+ assert description is None, f"no violation should be observed"
+ return
+
+
+
+
diff --git a/src/lib_brecal_utils/tests/validators/test_validation_rule_state.py b/src/lib_brecal_utils/tests/validators/test_validation_rule_state.py
new file mode 100644
index 0000000..3ea2281
--- /dev/null
+++ b/src/lib_brecal_utils/tests/validators/test_validation_rule_state.py
@@ -0,0 +1,26 @@
+import pytest
+from brecal_utils.database.enums import StatusFlags
+
+def test_validation_rule_state_green_is_1():
+ assert StatusFlags.GREEN.value==1
+ return
+
+def test_validation_rule_state_yellow_is_2():
+ assert StatusFlags.YELLOW.value==2
+ return
+
+def test_validation_rule_state_red_is_3():
+ assert StatusFlags.RED.value==3
+ return
+
+def test_validation_rule_state_order():
+ # Red 3, Yellow 2, Green 1, None 0
+ # red>yellow>green>none
+ assert StatusFlags.RED.value>StatusFlags.YELLOW.value
+ assert StatusFlags.YELLOW.value>StatusFlags.GREEN.value
+ assert StatusFlags.GREEN.value>StatusFlags.NONE.value
+ return
+
+if __name__=="__main__":
+ pass
+
diff --git a/src/server/requirements.txt b/src/server/requirements.txt
index ca817e1..9c7b54f 100644
--- a/src/server/requirements.txt
+++ b/src/server/requirements.txt
@@ -10,4 +10,9 @@ pydapper[mysql-connector-python]
marshmallow-dataclass
bcrypt
jwt
-flask-jwt-extended
\ No newline at end of file
+flask-jwt-extended
+SQLAlchemy
+numpy
+pandas
+
+
diff --git a/src/server/tests/__init__.py b/src/server/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/server/tests/api/__init__.py b/src/server/tests/api/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/server/tests/impl/__init__.py b/src/server/tests/impl/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/server/tests/schemas/__init__.py b/src/server/tests/schemas/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/server/tests/services/__init__.py b/src/server/tests/services/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/server/tests/test_create_app.py b/src/server/tests/test_create_app.py
new file mode 100644
index 0000000..c119bfe
--- /dev/null
+++ b/src/server/tests/test_create_app.py
@@ -0,0 +1,21 @@
+import pytest
+
+def test_create_app():
+ """
+
+ """
+ import os
+ import sys
+ from brecal_utils import get_project_root
+
+ project_root = get_project_root("brecal")
+ lib_location = os.path.join(project_root, "src", "server")
+ sys.path.append(lib_location)
+
+ from BreCal import create_app
+ os.chdir(project_root) # set the current directory to ~/brecal, so the config is found
+ application = create_app()
+ return
+
+if __name__=="__main__":
+ test_create_app()