Compare commits

..

7 Commits

9 changed files with 168 additions and 66 deletions

21
misc/Readme_test.md Normal file
View File

@ -0,0 +1,21 @@
# Tests
___
## schemathesis
Unter dem Verzeichnis ./src/server/tests befinden sich die Test-Cases, die Max bei der ersten Implementierung der Validierung angelegt hat.
Diese sind derzeit nicht "aktiv", bzw. noch nicht bereinigt.
Ich möchte gern ein automatisches Framework verwenden, das aber nur "manuell" betrieben wird. Änderungen an der API sind selten.
Aktueller Stand (7.1.26):
Im Verzeichnis ./src/server/tests kann folgendes ausgeführt werden:
```pytest -q --maxfail=1 contract/test_openapi_fuzz.py```
Das Ganze funktioniert nur, wenn auch schemathesis und hypothesis in den passenden(!) Versionen im lokalen _venv_ installiert sind.
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
```pip install "hypothesis==6.120.0"```
Das muss wegen dependencies so blöd gepinnt werden.

View File

@ -136,7 +136,7 @@ def PostShipcalls(schemaModel):
new_id = commands.execute_scalar("select last_insert_id()") new_id = commands.execute_scalar("select last_insert_id()")
shipdata = get_ship_data_for_id(schemaModel["ship_id"]) shipdata = get_ship_data_for_id(schemaModel["ship_id"])
message = shipdata['name'] message = "The participant has been assigned to the shipcall."
if "type_value" in schemaModel: if "type_value" in schemaModel:
match schemaModel["type_value"]: match schemaModel["type_value"]:
case 1: case 1:
@ -242,20 +242,6 @@ def PutShipcalls(schemaModel, original_payload=None):
query = "UPDATE shipcall SET " + ", ".join(update_clauses) + " WHERE id = ?id?" query = "UPDATE shipcall SET " + ", ".join(update_clauses) + " WHERE id = ?id?"
commands.execute(query, param=schemaModel) commands.execute(query, param=schemaModel)
ship_id_value = schemaModel.get("ship_id") if (provided_keys is None or "ship_id" in provided_keys) else theshipcall["ship_id"]
shipdata = get_ship_data_for_id(ship_id_value)
message = shipdata['name']
type_value = schemaModel.get("type_value") if (provided_keys is None or "type" in provided_keys) else theshipcall["type"]
if type_value is not None:
match type_value:
case 1:
message += " [ARRIVAL]"
case 2:
message += " [DEPARTURE]"
case 3:
message += " [SHIFTING]"
# pquery = SQLQuery.get_shipcall_participant_map_by_shipcall_id() # pquery = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pquery = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?" pquery = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?"
pdata = commands.query(pquery,param={"id" : schemaModel["id"]}) # existing list of assignments pdata = commands.query(pquery,param={"id" : schemaModel["id"]}) # existing list of assignments
@ -274,6 +260,7 @@ def PutShipcalls(schemaModel, original_payload=None):
found_participant = True found_participant = True
break break
if not found_participant: if not found_participant:
message = "The participant has been assigned to the shipcall."
# nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() # nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
spquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" spquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
commands.execute(spquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}) commands.execute(spquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]})
@ -307,6 +294,7 @@ def PutShipcalls(schemaModel, original_payload=None):
commands.execute(nquery, param={"nid" : existing_notification["id"]}) commands.execute(nquery, param={"nid" : existing_notification["id"]})
else: else:
# create un-assignment notification # create un-assignment notification
message = "The participant has been unassigned from the shipcall."
nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 5, ?message?)" nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 5, ?message?)"
commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : elem["participant_id"], "message" : message}) commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : elem["participant_id"], "message" : message})
break break
@ -314,6 +302,7 @@ def PutShipcalls(schemaModel, original_payload=None):
canceled_value = schemaModel.get("canceled") canceled_value = schemaModel.get("canceled")
if canceled_value is not None: if canceled_value is not None:
if canceled_value and not was_canceled: if canceled_value and not was_canceled:
message = "The shipcall has been canceled."
# create a canceled notification for all currently assigned participants # create a canceled notification for all currently assigned participants
stornoNotificationQuery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 7, ?message?)" stornoNotificationQuery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 7, ?message?)"
for participant_assignment in schemaModel["participants"]: for participant_assignment in schemaModel["participants"]:

View File

@ -314,7 +314,8 @@ def eval_next_24_hrs():
found_notification = True found_notification = True
break break
if not found_notification: if not found_notification:
commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":shipcall["name"]}) message = "The shipcall is scheduled to arrive/depart within the next 24 hours."
commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":message})
except Exception as ex: except Exception as ex:
logging.error(ex) logging.error(ex)

View File

@ -5,11 +5,13 @@ import re
import numpy as np import numpy as np
import pandas as pd import pandas as pd
import datetime import datetime
import threading
from BreCal.database.enums import StatusFlags from BreCal.database.enums import StatusFlags
from BreCal.validators.validation_rule_functions import ValidationRuleFunctions from BreCal.validators.validation_rule_functions import ValidationRuleFunctions
from BreCal.schemas.model import Shipcall from BreCal.schemas.model import Shipcall
from BreCal.local_db import getPoolConnection from BreCal.local_db import getPoolConnection
_evaluation_lock = threading.Lock()
class ValidationRules(ValidationRuleFunctions): class ValidationRules(ValidationRuleFunctions):
""" """
@ -74,37 +76,52 @@ class ValidationRules(ValidationRuleFunctions):
return evaluation_state, violations return evaluation_state, violations
def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame: def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame:
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# unbundle individual results. evaluation_states becomes an integer, violation # Acquire lock to prevent race conditions during evaluation and notification creation
evaluation_states_new = [StatusFlags(res[0]).value for res in results] with _evaluation_lock:
violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results] """apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returnsshipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations] evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# build the list of evaluation times ('now', as isoformat) # unbundle individual results. evaluation_states becomes an integer, violation
#evaluation_time = self.get_notification_times(evaluation_states_new) evaluation_states_new = [StatusFlags(res[0]).value for res in results]
violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results]
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
# build the list of evaluation times ('now', as isoformat)
#evaluation_time = self.get_notification_times(evaluation_states_new)
if evaluation_states_old is not None and evaluation_states_new is not None:
pooledConnection = None
participants = None
try:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
for shipcall_id, state_old_raw, state_new_raw, violation in zip(shipcall_df.index, evaluation_states_old, evaluation_states_new, violations):
state_old = int(state_old_raw) if state_old_raw is not None else 0
state_new = int(state_new_raw) if state_new_raw is not None else 0
logging.info(f"Shipcall {shipcall_id}: state_old={state_old}, state_new={state_new}")
if state_old == state_new:
continue
if participants is None:
participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id = ?shipcall_id?"
participants = [participant for participant in commands.query(participant_query, model=dict, param={"shipcall_id" : int(shipcall_id)}) if participant.get("type") != 1]
send_notification = False
if evaluation_states_old is not None and evaluation_states_new is not None:
if len(evaluation_states_old) == 1 and len(evaluation_states_new) == 1:
if evaluation_states_old[0] != evaluation_states_new[0]:
pooledConnection = None
try:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
notification_type = 3 # RED (mapped to time_conflict) notification_type = 3 # RED (mapped to time_conflict)
if evaluation_states_new[0] == 2: send_notification = False
match evaluation_states_old[0]:
if state_new == 2:
match state_old:
case 0: case 0:
send_notification = True send_notification = True
case 1: case 1:
send_notification = True send_notification = True
notification_type = 6 # YELLOW (mapped to missing_data) notification_type = 6 # YELLOW (mapped to missing_data)
if evaluation_states_new[0] == 3: elif state_new == 3:
match evaluation_states_old[0]: match state_old:
case 0: case 0:
send_notification = True send_notification = True
case 1: case 1:
@ -113,33 +130,35 @@ class ValidationRules(ValidationRuleFunctions):
send_notification = True send_notification = True
if send_notification: if send_notification:
query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)" logging.info(f"Creating notification(s) for shipcall {shipcall_id}, type={notification_type}")
commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0]), "message" : violations[0]}) query = f"INSERT INTO notification (shipcall_id, participant_id, type, level, message) VALUES (?shipcall_id?, ?participant_id?, {notification_type}, 0, ?message?)"
for participant in participants:
commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"], "message" : violation})
if evaluation_states_new[0] == 1 and evaluation_states_old[0] != 0: # this resolves the conflict if state_new == 1 and state_old != 0: # this resolves the time conflict
query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0" logging.info(f"Resolving notifications for shipcall {shipcall_id}, type={notification_type}")
existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_df.index[0])}) query = f"DELETE from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0"
if len(existing_notification) > 0: deleted_count = commands.execute(query, param={"shipcall_id" : int(shipcall_id)})
query = "DELETE from notification where id = ?id?" logging.info(f"Deleted {deleted_count} existing notifications (yet unsent)")
commands.execute(query, param={"id" : existing_notification[0]["id"]}) if deleted_count == 0:
else: query = "INSERT INTO notification (shipcall_id, participant_id, type, level) VALUES (?shipcall_id?, ?participant_id?, 4, 0)"
query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)" for participant in participants:
commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0])}) commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"]})
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
pooledConnection.close() pooledConnection.close()
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created # build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
#evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new) #evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new)
# TODO: detect evaluation state changes and create notifications # TODO: detect evaluation state changes and create notifications
shipcall_df.loc[:,"evaluation"] = evaluation_states_new shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations shipcall_df.loc[:,"evaluation_message"] = violations
#shipcall_df.loc[:,"evaluation_time"] = evaluation_time #shipcall_df.loc[:,"evaluation_time"] = evaluation_time
#shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent #shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
return shipcall_df return shipcall_df
def concise_evaluation_message_if_too_long(self, violation): def concise_evaluation_message_if_too_long(self, violation):
""" """

View File

@ -0,0 +1,6 @@
[project.optional-dependencies]
test = [
"pytest>=7.0",
"schemathesis>=3.0",
"requests>=2.31",
]

View File

@ -20,4 +20,4 @@ pytest
pytest-cov pytest-cov
coverage coverage
../server/.

View File

@ -0,0 +1,45 @@
import os
import pytest
import requests
@pytest.fixture(scope="session")
def base_url() -> str:
# Example: https://dev.api.mycompany.com
url = os.environ.get("API_BASE_URL")
if not url:
url = "http://neptun.fritz.box"
# raise RuntimeError("Set API_BASE_URL")
return url.rstrip("/")
@pytest.fixture(scope="session")
def login_payload() -> dict[str, str]:
username = os.environ.get("API_USERNAME")
if not username:
username = "Londo"
password = os.environ.get("API_PASSWORD")
if not password:
password = "Hallowach"
# if not username or not password:
# raise RuntimeError("Set API_USERNAME and API_PASSWORD")
return {"username": username, "password": password}
@pytest.fixture(scope="session")
def jwt_token(base_url: str, login_payload: dict[str, str]) -> str:
# Adapt these to your auth endpoint + response shape:
login_path = os.environ.get("API_LOGIN_PATH", "/login")
resp = requests.post(
f"{base_url}{login_path}",
json=login_payload,
timeout=30,
)
resp.raise_for_status()
data = resp.json()
token = data.get("access_token") or data.get("token")
if not token:
raise RuntimeError("Could not find JWT token in login response JSON")
return token
@pytest.fixture(scope="session")
def auth_headers(jwt_token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {jwt_token}"}

View File

@ -0,0 +1,18 @@
import schemathesis
schema = schemathesis.openapi.from_path("../../../misc/BreCalApi.yaml")
@schema.parametrize()
def test_api_conformance(
case,
base_url: str,
auth_headers: dict[str, str],
login_payload: dict[str, str],
) -> None:
# Calls your real service:
if case.path == "/login" and case.method.upper() == "POST":
response = case.call(base_url=base_url, json=login_payload)
else:
response = case.call(base_url=base_url, headers=auth_headers)
# Validates status code, headers, and body against the OpenAPI schema:
case.validate_response(response)

View File

@ -0,0 +1,3 @@
[pytest]
addopts = -ra
testpaths = tests