Compare commits

..

No commits in common. "develop" and "v1.7.0_devel" have entirely different histories.

9 changed files with 66 additions and 168 deletions

View File

@ -1,21 +0,0 @@
# Tests
___
## schemathesis
Unter dem Verzeichnis ./src/server/tests befinden sich die Test-Cases, die Max bei der ersten Implementierung der Validierung angelegt hat.
Diese sind derzeit nicht "aktiv", bzw. noch nicht bereinigt.
Ich möchte gern ein automatisches Framework verwenden, das aber nur "manuell" betrieben wird. Änderungen an der API sind selten.
Aktueller Stand (7.1.26):
Im Verzeichnis ./src/server/tests kann folgendes ausgeführt werden:
```pytest -q --maxfail=1 contract/test_openapi_fuzz.py```
Das Ganze funktioniert nur, wenn auch schemathesis und hypothesis in den passenden(!) Versionen im lokalen _venv_ installiert sind.
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
```pip install "hypothesis==6.120.0"```
Das muss wegen dependencies so blöd gepinnt werden.

View File

@ -136,7 +136,7 @@ def PostShipcalls(schemaModel):
new_id = commands.execute_scalar("select last_insert_id()")
shipdata = get_ship_data_for_id(schemaModel["ship_id"])
message = "The participant has been assigned to the shipcall."
message = shipdata['name']
if "type_value" in schemaModel:
match schemaModel["type_value"]:
case 1:
@ -206,7 +206,7 @@ def PutShipcalls(schemaModel, original_payload=None):
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
was_canceled = theshipcall["canceled"]
provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None
@ -242,6 +242,20 @@ def PutShipcalls(schemaModel, original_payload=None):
query = "UPDATE shipcall SET " + ", ".join(update_clauses) + " WHERE id = ?id?"
commands.execute(query, param=schemaModel)
ship_id_value = schemaModel.get("ship_id") if (provided_keys is None or "ship_id" in provided_keys) else theshipcall["ship_id"]
shipdata = get_ship_data_for_id(ship_id_value)
message = shipdata['name']
type_value = schemaModel.get("type_value") if (provided_keys is None or "type" in provided_keys) else theshipcall["type"]
if type_value is not None:
match type_value:
case 1:
message += " [ARRIVAL]"
case 2:
message += " [DEPARTURE]"
case 3:
message += " [SHIFTING]"
# pquery = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pquery = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?"
pdata = commands.query(pquery,param={"id" : schemaModel["id"]}) # existing list of assignments
@ -249,7 +263,7 @@ def PutShipcalls(schemaModel, original_payload=None):
if schemaModel.get("participants") is None:
schemaModel["participants"] = []
# loop across passed participant ids, creating entries for those not present in pdata
# loop across passed participant ids, creating entries for those not present in pdata
existing_notifications = get_notification_for_shipcall_and_type(schemaModel["id"], 1) # type = 1 is assignment
@ -260,7 +274,6 @@ def PutShipcalls(schemaModel, original_payload=None):
found_participant = True
break
if not found_participant:
message = "The participant has been assigned to the shipcall."
# nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
spquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
commands.execute(spquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]})
@ -294,20 +307,18 @@ def PutShipcalls(schemaModel, original_payload=None):
commands.execute(nquery, param={"nid" : existing_notification["id"]})
else:
# create un-assignment notification
message = "The participant has been unassigned from the shipcall."
nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 5, ?message?)"
commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : elem["participant_id"], "message" : message})
break
canceled_value = schemaModel.get("canceled")
if canceled_value is not None:
if canceled_value and not was_canceled:
message = "The shipcall has been canceled."
# create a canceled notification for all currently assigned participants
stornoNotificationQuery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 7, ?message?)"
for participant_assignment in schemaModel["participants"]:
commands.execute(stornoNotificationQuery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "message" : message})
# save history data
# TODO: set ETA properly
# query = SQLQuery.create_sql_query_history_put()

View File

@ -314,8 +314,7 @@ def eval_next_24_hrs():
found_notification = True
break
if not found_notification:
message = "The shipcall is scheduled to arrive/depart within the next 24 hours."
commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":message})
commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":shipcall["name"]})
except Exception as ex:
logging.error(ex)

View File

@ -5,13 +5,11 @@ import re
import numpy as np
import pandas as pd
import datetime
import threading
from BreCal.database.enums import StatusFlags
from BreCal.validators.validation_rule_functions import ValidationRuleFunctions
from BreCal.schemas.model import Shipcall
from BreCal.local_db import getPoolConnection
_evaluation_lock = threading.Lock()
class ValidationRules(ValidationRuleFunctions):
"""
@ -76,52 +74,37 @@ class ValidationRules(ValidationRuleFunctions):
return evaluation_state, violations
def evaluate_shipcalls(self, shipcall_df:pd.DataFrame)->pd.DataFrame:
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returns shipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# Acquire lock to prevent race conditions during evaluation and notification creation
with _evaluation_lock:
"""apply 'evaluate_shipcall_from_df' to each individual shipcall in {shipcall_df}. Returnsshipcall_df ('evaluation', 'evaluation_message', 'evaluation_time' and 'evaluation_notifications_sent' are updated)"""
evaluation_states_old = [state_old for state_old in shipcall_df.loc[:,"evaluation"]]
evaluation_states_old = [state_old if not pd.isna(state_old) else 0 for state_old in evaluation_states_old]
results = shipcall_df.apply(lambda x: self.evaluate_shipcall_from_df(x), axis=1).values # returns tuple (state, message)
# unbundle individual results. evaluation_states becomes an integer, violation
evaluation_states_new = [StatusFlags(res[0]).value for res in results]
violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results]
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
# unbundle individual results. evaluation_states becomes an integer, violation
evaluation_states_new = [StatusFlags(res[0]).value for res in results]
violations = [",\r\n".join(res[1]) if len(res[1])>0 else None for res in results]
violations = [self.concise_evaluation_message_if_too_long(violation) for violation in violations]
# build the list of evaluation times ('now', as isoformat)
#evaluation_time = self.get_notification_times(evaluation_states_new)
# build the list of evaluation times ('now', as isoformat)
#evaluation_time = self.get_notification_times(evaluation_states_new)
if evaluation_states_old is not None and evaluation_states_new is not None:
pooledConnection = None
participants = None
try:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
for shipcall_id, state_old_raw, state_new_raw, violation in zip(shipcall_df.index, evaluation_states_old, evaluation_states_new, violations):
state_old = int(state_old_raw) if state_old_raw is not None else 0
state_new = int(state_new_raw) if state_new_raw is not None else 0
logging.info(f"Shipcall {shipcall_id}: state_old={state_old}, state_new={state_new}")
if state_old == state_new:
continue
if participants is None:
participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id = ?shipcall_id?"
participants = [participant for participant in commands.query(participant_query, model=dict, param={"shipcall_id" : int(shipcall_id)}) if participant.get("type") != 1]
notification_type = 3 # RED (mapped to time_conflict)
send_notification = False
if state_new == 2:
match state_old:
send_notification = False
if evaluation_states_old is not None and evaluation_states_new is not None:
if len(evaluation_states_old) == 1 and len(evaluation_states_new) == 1:
if evaluation_states_old[0] != evaluation_states_new[0]:
pooledConnection = None
try:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
notification_type = 3 # RED (mapped to time_conflict)
if evaluation_states_new[0] == 2:
match evaluation_states_old[0]:
case 0:
send_notification = True
case 1:
send_notification = True
notification_type = 6 # YELLOW (mapped to missing_data)
elif state_new == 3:
match state_old:
if evaluation_states_new[0] == 3:
match evaluation_states_old[0]:
case 0:
send_notification = True
case 1:
@ -130,35 +113,33 @@ class ValidationRules(ValidationRuleFunctions):
send_notification = True
if send_notification:
logging.info(f"Creating notification(s) for shipcall {shipcall_id}, type={notification_type}")
query = f"INSERT INTO notification (shipcall_id, participant_id, type, level, message) VALUES (?shipcall_id?, ?participant_id?, {notification_type}, 0, ?message?)"
for participant in participants:
commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"], "message" : violation})
query = f"INSERT INTO notification (shipcall_id, type, level, message) VALUES (?shipcall_id?, {notification_type}, 0, ?message?)"
commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0]), "message" : violations[0]})
if state_new == 1 and state_old != 0: # this resolves the time conflict
logging.info(f"Resolving notifications for shipcall {shipcall_id}, type={notification_type}")
query = f"DELETE from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0"
deleted_count = commands.execute(query, param={"shipcall_id" : int(shipcall_id)})
logging.info(f"Deleted {deleted_count} existing notifications (yet unsent)")
if deleted_count == 0:
query = "INSERT INTO notification (shipcall_id, participant_id, type, level) VALUES (?shipcall_id?, ?participant_id?, 4, 0)"
for participant in participants:
commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"]})
finally:
if pooledConnection is not None:
pooledConnection.close()
if evaluation_states_new[0] == 1 and evaluation_states_old[0] != 0: # this resolves the conflict
query = f"SELECT * from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0"
existing_notification = commands.query(query, param={"shipcall_id" : int(shipcall_df.index[0])})
if len(existing_notification) > 0:
query = "DELETE from notification where id = ?id?"
commands.execute(query, param={"id" : existing_notification[0]["id"]})
else:
query = "INSERT INTO notification (shipcall_id, type, level) VALUES (?shipcall_id?, 4, 0)"
commands.execute(query, param={"shipcall_id" : int(shipcall_df.index[0])})
finally:
if pooledConnection is not None:
pooledConnection.close()
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
#evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new)
# build the list of 'evaluation_notifications_sent'. The value is 'False', when a notification should be created
#evaluation_notifications_sent = self.get_notification_states(evaluation_states_old, evaluation_states_new)
# TODO: detect evaluation state changes and create notifications
# TODO: detect evaluation state changes and create notifications
shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations
#shipcall_df.loc[:,"evaluation_time"] = evaluation_time
#shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
return shipcall_df
shipcall_df.loc[:,"evaluation"] = evaluation_states_new
shipcall_df.loc[:,"evaluation_message"] = violations
#shipcall_df.loc[:,"evaluation_time"] = evaluation_time
#shipcall_df.loc[:,"evaluation_notifications_sent"] = evaluation_notifications_sent
return shipcall_df
def concise_evaluation_message_if_too_long(self, violation):
"""

View File

@ -1,6 +0,0 @@
[project.optional-dependencies]
test = [
"pytest>=7.0",
"schemathesis>=3.0",
"requests>=2.31",
]

View File

@ -20,4 +20,4 @@ pytest
pytest-cov
coverage
../server/.

View File

@ -1,45 +0,0 @@
import os
import pytest
import requests
@pytest.fixture(scope="session")
def base_url() -> str:
# Example: https://dev.api.mycompany.com
url = os.environ.get("API_BASE_URL")
if not url:
url = "http://neptun.fritz.box"
# raise RuntimeError("Set API_BASE_URL")
return url.rstrip("/")
@pytest.fixture(scope="session")
def login_payload() -> dict[str, str]:
username = os.environ.get("API_USERNAME")
if not username:
username = "Londo"
password = os.environ.get("API_PASSWORD")
if not password:
password = "Hallowach"
# if not username or not password:
# raise RuntimeError("Set API_USERNAME and API_PASSWORD")
return {"username": username, "password": password}
@pytest.fixture(scope="session")
def jwt_token(base_url: str, login_payload: dict[str, str]) -> str:
# Adapt these to your auth endpoint + response shape:
login_path = os.environ.get("API_LOGIN_PATH", "/login")
resp = requests.post(
f"{base_url}{login_path}",
json=login_payload,
timeout=30,
)
resp.raise_for_status()
data = resp.json()
token = data.get("access_token") or data.get("token")
if not token:
raise RuntimeError("Could not find JWT token in login response JSON")
return token
@pytest.fixture(scope="session")
def auth_headers(jwt_token: str) -> dict[str, str]:
return {"Authorization": f"Bearer {jwt_token}"}

View File

@ -1,18 +0,0 @@
import schemathesis
schema = schemathesis.openapi.from_path("../../../misc/BreCalApi.yaml")
@schema.parametrize()
def test_api_conformance(
case,
base_url: str,
auth_headers: dict[str, str],
login_payload: dict[str, str],
) -> None:
# Calls your real service:
if case.path == "/login" and case.method.upper() == "POST":
response = case.call(base_url=base_url, json=login_payload)
else:
response = case.call(base_url=base_url, headers=auth_headers)
# Validates status code, headers, and body against the OpenAPI schema:
case.validate_response(response)

View File

@ -1,3 +0,0 @@
[pytest]
addopts = -ra
testpaths = tests