Compare commits

..

No commits in common. "feature/transition_to_boolean" and "develop" have entirely different histories.

30 changed files with 123 additions and 519 deletions

2
.gitignore vendored
View File

@ -289,5 +289,3 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
src/notebooks_metz/
src/server/editable_requirements.txt
schemathesis_report.html

View File

@ -95,14 +95,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
description: Not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error_field: no such record
'500':
$ref: '#/components/responses/500'
'503':
@ -200,8 +192,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'409':
$ref: '#/components/responses/409'
'500':
$ref: '#/components/responses/500'
'503':
@ -258,8 +248,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'409':
$ref: '#/components/responses/409'
'500':
$ref: '#/components/responses/500'
'503':
@ -315,10 +303,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
$ref: '#/components/responses/404'
'409':
$ref: '#/components/responses/409'
'500':
$ref: '#/components/responses/500'
'503':
@ -422,10 +406,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
$ref: '#/components/responses/404'
'409':
$ref: '#/components/responses/409'
'500':
$ref: '#/components/responses/500'
'503':
@ -451,8 +431,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
$ref: '#/components/responses/404'
'500':
$ref: '#/components/responses/500'
'503':
@ -470,10 +448,7 @@ paths:
required: false
description: '**Id of user**. *Example: 2*. User id returned by login call.'
schema:
oneOf:
- type: integer
- type: string
pattern: '^[0-9]+$'
type: integer
example: 2
responses:
'200':
@ -513,10 +488,7 @@ paths:
in: query
description: '**Id**. *Example: 42*. Id of referenced ship call.'
schema:
oneOf:
- type: integer
- type: string
pattern: '^[0-9]+$'
type: integer
example: 42
example: 42
responses:
@ -650,8 +622,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
$ref: '#/components/responses/404'
'500':
$ref: '#/components/responses/500'
'503':
@ -676,8 +646,6 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
$ref: '#/components/responses/404'
'500':
$ref: '#/components/responses/500'
'503':
@ -695,11 +663,7 @@ paths:
required: false
description: '**Id of participant**. *Example: 7*. Id of logged in participant.'
schema:
oneOf:
- type: integer
- type: string
pattern: '^[0-9]+$'
example: 7
$ref: '#/components/schemas/participant_id'
responses:
'200':
description: notification list
@ -836,14 +800,10 @@ components:
properties:
username:
type: string
minLength: 1
pattern: "\\S"
example: alfred
password:
type: string
format: password
minLength: 1
pattern: "\\S"
example: '123456'
required:
- username
@ -879,7 +839,6 @@ components:
shipcall:
type: object
description: Ship call data
additionalProperties: false
example:
id: 6
ship_id: 8
@ -999,8 +958,6 @@ components:
nullable: true
recommended_tugs:
type: integer
minimum: 0
maximum: 10
example: 2
nullable: true
anchored:
@ -1434,7 +1391,6 @@ components:
type: integer
nullable: true
example: 1234567
description: International Maritime Organization number, must be unique across all ships
callsign:
type: string
maxLength: 8
@ -1670,17 +1626,14 @@ components:
street:
type: string
maxLength: 128
nullable: true
example: Hermann-Hollerith-Str. 7
postal code:
type: string
maxLength: 5
nullable: true
example: '28359'
city:
type: string
maxLength: 64
nullable: true
example: Bremen
type:
type: integer
@ -1748,25 +1701,19 @@ components:
example: 5
first_name:
type: string
maxLength: 45
example: John
last_name:
type: string
maxLength: 45
example: Doe
user_name:
type: string
maxLength: 45
example: johndoe
user_phone:
type: string
nullable: true
maxLength: 32
example: '1234567890'
user_email:
type: string
format: email
maxLength: 64
nullable: true
example: no@where.com
notify_email:
@ -1839,14 +1786,13 @@ components:
maxLength: 45
example: Doe
user_phone:
maxLength: 32
maxLength: 128
type: string
nullable: true
example: '1234567890'
user_email:
maxLength: 64
maxLength: 128
type: string
format: email
nullable: true
example: no@where.com
notify_email:
@ -2071,17 +2017,8 @@ components:
schema:
$ref: '#/components/schemas/Error'
example:
error_field: No such record
error_description: The requested resource to update was not found
'409':
description: Conflict
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error_field: imo
error_description: Resource already exists
error_field: shipcall_id
error_description: Ship call not found
'500':
description: Unexpected error
content:

View File

@ -19,9 +19,3 @@ Das Ganze funktioniert nur, wenn auch schemathesis und hypothesis in den passend
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
```pip install "hypothesis==6.120.0"```
Das muss wegen dependencies so blöd gepinnt werden.
Damit pytest die API findet muss API_BASE_URL als Umgebungsvariable gesetzt werden. In Powershell z.B. so:
```powershell
$env:API_BASE_URL = "http://localhost:5000"
```

View File

@ -35,4 +35,3 @@ typing_extensions==4.12.2
tzdata==2024.1
webargs==8.6.0
Werkzeug==3.0.4
ntlm-auth==1.5.0

View File

@ -16,7 +16,6 @@ from .api import user
from .api import history
from .api import ports
from BreCal.schemas import defs
from BreCal.brecal_utils.file_handling import get_project_root, ensure_path
from BreCal.brecal_utils.test_handling import execute_test_with_pytest, execute_coverage_test
from BreCal.brecal_utils.time_handling import difference_to_then
@ -73,39 +72,21 @@ def create_app(test_config=None, instance_path=None):
app.register_blueprint(ports.bp)
log_level = getattr(logging, app.config.get("LOG_LEVEL", "DEBUG"))
log_format = "%(asctime)s [%(levelname)s] %(message)s"
root_logger = logging.getLogger()
for handler in list(root_logger.handlers):
root_logger.removeHandler(handler)
log_kwargs = {"format": "%(asctime)s | %(name)s | %(levelname)s | %(message)s"}
if app.config.get("LOG_TO_STDERR"):
handler = logging.StreamHandler(sys.stderr)
log_kwargs["stream"] = sys.stderr
else:
log_file = app.config.get("LOG_FILE", "brecaltest.log")
if not os.path.isabs(log_file):
log_file = os.path.join(app.instance_path, log_file)
log_dir = os.path.dirname(log_file)
if log_dir and not os.path.exists(log_dir):
os.makedirs(log_dir, exist_ok=True)
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(log_format, datefmt="%d.%m.%Y %H:%M:%S"))
root_logger.addHandler(handler)
root_logger.setLevel(log_level)
log_kwargs["filename"] = app.config.get("LOG_FILE", "brecaltest.log")
logging.basicConfig(level=log_level, **log_kwargs)
if app.config.get("SECRET_KEY"):
os.environ["SECRET_KEY"] = app.config["SECRET_KEY"]
defs.SMTP_DEBUG_LEVEL = app.config.get("SMTP_DEBUG_LEVEL", defs.SMTP_DEBUG_LEVEL)
local_db.initPool(os.path.dirname(app.instance_path), config=app.config)
logging.info('App started')
# Setup Routine jobs (e.g., reevaluation of shipcalls)
setup_schedule(
update_shipcalls_interval_in_minutes=app.config.get("SCHEDULE_UPDATE_SHIPCALLS_MINUTES", 60),
notification_cooldown_mins=app.config.get("NOTIFICATION_COOLDOWN_MINS", defs.NOTIFICATION_COOLDOWN_MINS),
)
setup_schedule(update_shipcalls_interval_in_minutes=app.config.get("SCHEDULE_UPDATE_SHIPCALLS_MINUTES", 60))
run_schedule_permanently_in_background(latency=app.config.get("SCHEDULE_BACKGROUND_LATENCY_SECONDS", 30))
logging.info('Routine Jobs are defined.')

View File

@ -17,14 +17,7 @@ def GetHistory():
options = {}
if not 'shipcall_id' in request.args:
return create_dynamic_exception_response(ex=None, status_code=400, message="missing parameter: shipcall_id")
shipcall_id_values = request.args.getlist("shipcall_id")
if len(shipcall_id_values) != 1:
return create_dynamic_exception_response(ex=None, status_code=400, message="invalid parameter: shipcall_id")
shipcall_id_raw = shipcall_id_values[0]
try:
options["shipcall_id"] = int(shipcall_id_raw)
except (TypeError, ValueError):
return create_dynamic_exception_response(ex=None, status_code=400, message="invalid parameter: shipcall_id")
options["shipcall_id"] = request.args.get("shipcall_id")
return impl.history.GetHistory(options)
else:
return create_dynamic_exception_response(ex=None, status_code=403, message="not authenticated")

View File

@ -1,4 +1,4 @@
from flask import Blueprint, request, jsonify
from flask import Blueprint, request
from flask_jwt_extended import create_access_token
from webargs.flaskparser import parser
from ..schemas import model
@ -13,14 +13,4 @@ bp = Blueprint('login', __name__)
def Logon():
options = request.get_json(force=True)
if not isinstance(options, dict):
return jsonify({"error_field": "invalid request body"}), 400
username = options.get("username")
password = options.get("password")
if not username or not password:
return jsonify({"error_field": "username and password required"}), 400
if not isinstance(username, str) or not isinstance(password, str):
return jsonify({"error_field": "username and password must be strings"}), 400
if not username.strip() or not password.strip():
return jsonify({"error_field": "username and password cannot be empty"}), 400
return impl.login.GetUser(options)

View File

@ -16,12 +16,7 @@ def GetParticipant():
if 'Authorization' in request.headers:
payload = decode_jwt(request.headers.get("Authorization").split("Bearer ")[-1])
options = {}
user_id_raw = request.args.get("user_id")
if user_id_raw is not None:
try:
options["user_id"] = int(user_id_raw)
except (TypeError, ValueError):
return create_dynamic_exception_response(ex=None, status_code=400, message="invalid parameter: user_id")
options["user_id"] = request.args.get("user_id")
if "participant_id" in payload:
options["participant_id"] = payload["participant_id"]
else:

View File

@ -35,14 +35,7 @@ def GetShipcalls():
"""
payload = decode_jwt(request.headers.get("Authorization").split("Bearer ")[-1])
options = {}
past_days_raw = request.args.get("past_days", default=None)
if past_days_raw is None:
options["past_days"] = 1
else:
try:
options["past_days"] = int(past_days_raw)
except (TypeError, ValueError):
return create_dynamic_exception_response(ex=None, status_code=400, message="invalid parameter: past_days")
options["past_days"] = request.args.get("past_days", default=1, type=int)
options["participant_id"] = payload["participant_id"]
return impl.shipcalls.GetShipcalls(options)

View File

@ -13,18 +13,6 @@ from BreCal.validators.input_validation_ship import InputValidationShip
bp = Blueprint('ships', __name__)
def _message_contains(messages, needle: str) -> bool:
if not isinstance(messages, dict):
return False
for value in messages.values():
if isinstance(value, list):
text = " ".join(map(str, value))
else:
text = str(value)
if needle in text:
return True
return False
@bp.route('/ships', methods=['get'])
@auth_guard() # no restriction by role
def GetShips():
@ -64,8 +52,6 @@ def PostShip():
return impl.ships.PostShip(loadedModel)
except ValidationError as ex:
if _message_contains(ex.messages, "already exists"):
return create_validation_error_response(ex=ex, status_code=409)
return create_validation_error_response(ex=ex, status_code=400)
except Exception as ex:
@ -93,8 +79,6 @@ def PutShip():
return impl.ships.PutShip(loadedModel)
except ValidationError as ex:
if _message_contains(ex.messages, "may not be changed"):
return create_validation_error_response(ex=ex, status_code=409)
return create_validation_error_response(ex=ex, status_code=400)
except Exception as ex:
@ -127,8 +111,6 @@ def DeleteShip():
return impl.ships.DeleteShip(options)
except ValidationError as ex:
if _message_contains(ex.messages, "already deleted") or _message_contains(ex.messages, "Could not find a ship"):
return create_validation_error_response(ex=ex, status_code=404)
return create_validation_error_response(ex=ex, status_code=400)
except Exception as ex:

View File

@ -18,14 +18,7 @@ def GetTimes():
try:
options = {}
shipcall_id_raw = request.args.get("shipcall_id")
if shipcall_id_raw is not None:
try:
options["shipcall_id"] = int(shipcall_id_raw)
except (TypeError, ValueError):
return create_dynamic_exception_response(ex=None, status_code=400, message="invalid parameter: shipcall_id")
else:
options["shipcall_id"] = None
options["shipcall_id"] = request.args.get("shipcall_id")
return impl.times.GetTimes(options)
except Exception as ex:

View File

@ -1,7 +1,6 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -34,7 +33,7 @@ def GetBerths(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500
finally:
if pooledConnection is not None:

View File

@ -2,7 +2,6 @@ import json
import logging
import pydapper
import pdb
from flask import jsonify
from ..schemas import model
from ..schemas.model import History
@ -35,7 +34,7 @@ def GetHistory(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify("call failed"), 500
return json.dumps("call failed"), 500
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -1,7 +1,7 @@
import json
import logging
import pydapper
import bcrypt
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -35,24 +35,24 @@ def GetUser(options):
"user_name": data[0].user_name,
"user_phone": data[0].user_phone,
"user_email": data[0].user_email,
"notify_email": model._coerce_bool(data[0].notify_email),
"notify_whatsapp": model._coerce_bool(data[0].notify_whatsapp),
"notify_signal": model._coerce_bool(data[0].notify_signal),
"notify_popup": model._coerce_bool(data[0].notify_popup),
"notify_email": data[0].notify_email,
"notify_whatsapp": data[0].notify_whatsapp,
"notify_signal": data[0].notify_signal,
"notify_popup": data[0].notify_popup,
"notify_on": model.notification_types_to_names(model.bitflag_to_list(data[0].notify_event))
}
token = jwt_handler.generate_jwt(payload=result, lifetime=120) # generate token valid 60 mins
result["token"] = token # add token to user data
return jsonify(result), 200
return json.dumps(result), 200, {'Content-Type': 'application/json; charset=utf-8'}
if len(data) > 1:
result = {}
result["error_field"] = "credential lookup mismatch"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
result = {}
result["error_field"] = "invalid credentials"
return jsonify(result), 403
return json.dumps(result), 403, {'Content-Type': 'application/json; charset=utf-8'}
except Exception as ex:
logging.error(ex)
@ -60,7 +60,7 @@ def GetUser(options):
result = {}
result["error_field"] = "call failed"
result["error_description"] = str(ex)
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:

View File

@ -1,7 +1,6 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -29,7 +28,7 @@ def GetNotifications(token, participant_id=None):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -1,7 +1,6 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -60,7 +59,7 @@ def GetParticipant(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify("call failed"), 500
return json.dumps("call failed"), 500
finally:
if pooledConnection is not None:

View File

@ -1,7 +1,6 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -24,7 +23,7 @@ def GetPorts(token):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500
finally:
if pooledConnection is not None:

View File

@ -2,7 +2,6 @@ import json
import logging
import traceback
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -45,7 +44,7 @@ def GetShipcalls(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
@ -179,7 +178,7 @@ def PostShipcalls(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
@ -206,7 +205,7 @@ def PutShipcalls(schemaModel, original_payload=None):
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel:
return jsonify("no such record"), 404
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
was_canceled = theshipcall["canceled"]
@ -288,10 +287,8 @@ def PutShipcalls(schemaModel, original_payload=None):
dquery = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?"
commands.execute(dquery, param={"existing_id" : elem["id"]})
# TODO: Create un-assignment notification but only if level > 0 else delete existing notification
found_assignment_notification = False
for existing_notification in existing_notifications:
if existing_notification["participant_id"] == elem["participant_id"]:
found_assignment_notification = True
if existing_notification["level"] == 0:
nquery = "DELETE FROM notification WHERE id = ?nid?"
commands.execute(nquery, param={"nid" : existing_notification["id"]})
@ -301,10 +298,6 @@ def PutShipcalls(schemaModel, original_payload=None):
nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 5, ?message?)"
commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : elem["participant_id"], "message" : message})
break
if not found_assignment_notification:
message = "The participant has been unassigned from the shipcall."
nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 5, ?message?)"
commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : elem["participant_id"], "message" : message})
canceled_value = schemaModel.get("canceled")
if canceled_value is not None:
@ -332,7 +325,7 @@ def PutShipcalls(schemaModel, original_payload=None):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:

View File

@ -1,7 +1,6 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -27,7 +26,7 @@ def GetShips(token):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
@ -91,7 +90,7 @@ def PostShip(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
pooledConnection.close()
@ -134,7 +133,7 @@ def PutShip(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
pooledConnection.close()
@ -152,22 +151,21 @@ def DeleteShip(options):
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_ship_delete_by_id()
# affected_rows = commands.execute(query, param={"id" : options["id"]})
ship_id = int(options["id"]) if not isinstance(options["id"], int) else options["id"]
affected_rows = commands.execute("UPDATE ship SET deleted = 1 WHERE id = ?id?", param={"id" : ship_id})
affected_rows = commands.execute("UPDATE ship SET deleted = 1 WHERE id = ?id?", param={"id" : options["id"]})
if affected_rows == 1:
return json.dumps({"id" : ship_id}), 200, {'Content-Type': 'application/json; charset=utf-8'}
return json.dumps({"id" : options["id"]}), 200, {'Content-Type': 'application/json; charset=utf-8'}
result = {}
result["error_field"] = "no such record"
return jsonify(result), 404
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'}
except Exception as ex:
logging.error(ex)
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -2,7 +2,6 @@ import json
import logging
import traceback
import pydapper
from flask import jsonify
from enum import Enum, Flag
from ..schemas import model
@ -36,7 +35,7 @@ def GetTimes(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
@ -109,7 +108,7 @@ def PostTimes(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
@ -131,7 +130,7 @@ def PutTimes(schemaModel, original_payload=None):
sentinel = object()
existing_times = commands.query_single_or_default("SELECT * FROM times WHERE id = ?id?", sentinel, param={"id": schemaModel["id"]})
if existing_times is sentinel:
return jsonify("no such record"), 404
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None
@ -178,7 +177,7 @@ def PutTimes(schemaModel, original_payload=None):
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:
@ -194,9 +193,8 @@ def DeleteTimes(options):
try:
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
times_id = int(options["id"]) if not isinstance(options["id"], int) else options["id"]
shipcall_id = commands.execute_scalar("SELECT shipcall_id FROM times WHERE id = ?id?", param={"id" : times_id})
affected_rows = commands.execute("DELETE FROM times WHERE id = ?id?", param={"id" : times_id})
shipcall_id = commands.execute_scalar("SELECT shipcall_id FROM times WHERE id = ?id?", param={"id" : options["id"]})
affected_rows = commands.execute("DELETE FROM times WHERE id = ?id?", param={"id" : options["id"]})
# TODO: set ETA properly?
@ -206,18 +204,18 @@ def DeleteTimes(options):
commands.execute(query, {"pid" : user_data["participant_id"], "shipcall_id" : shipcall_id, "uid" : user_data["id"]})
if affected_rows == 1:
return json.dumps({"id" : times_id}), 200, {'Content-Type': 'application/json; charset=utf-8'}
return json.dumps({"id" : options["id"]}), 200, {'Content-Type': 'application/json; charset=utf-8'}
result = {}
result["error_field"] = "no such record"
return jsonify(result), 404
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'}
except Exception as ex:
logging.error(ex)
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:

View File

@ -2,7 +2,6 @@ import json
import logging
import pydapper
import bcrypt
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -28,7 +27,7 @@ def PutUser(schemaModel):
theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
# #TODO: result = {"message":"no such record"} -> json.dumps
return jsonify({"error_field": "no such record"}), 404
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
# see if we need to update public fields
# #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields
@ -71,16 +70,16 @@ def PutUser(schemaModel):
else:
result = {}
result["error_field"] = "old password invalid"
return jsonify(result), 400
return json.dumps(result), 400, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify({"id" : schemaModel["id"]}), 200
return json.dumps({"id" : schemaModel["id"]}), 200
except Exception as ex:
logging.error(ex)
print(ex)
result = {}
result["error_field"] = "call failed"
return jsonify(result), 500
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
finally:
if pooledConnection is not None:

View File

@ -18,6 +18,3 @@ shipcall_types = {
3: "Shifting"
}
# Email transport logging (0 = quiet, 1 = verbose)
SMTP_DEBUG_LEVEL = 0

View File

@ -1,5 +1,5 @@
from dataclasses import field, dataclass
from marshmallow import Schema, fields, INCLUDE, ValidationError, validate, validates, post_load, pre_load, EXCLUDE
from marshmallow import Schema, fields, INCLUDE, ValidationError, validate, validates, post_load
from marshmallow.fields import Field
from marshmallow_enum import EnumField
from enum import IntEnum
@ -17,31 +17,13 @@ from BreCal.database.enums import ParticipantType, ParticipantFlag
def _format_datetime(value):
if value is None:
return None
if isinstance(value, datetime.datetime):
if value.tzinfo is None or value.tzinfo.utcoffset(value) is None:
return value.isoformat()
return value.astimezone().replace(tzinfo=None).isoformat()
return value
def obj_dict(obj):
if isinstance(obj, datetime.datetime):
return _format_datetime(obj)
return obj.isoformat()
if hasattr(obj, 'to_json'):
return obj.to_json()
return obj.__dict__
def _coerce_bool(value):
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, int) and value in (0, 1):
return bool(value)
return value
@dataclass
class Berth(Schema):
id: int
@ -54,19 +36,6 @@ class Berth(Schema):
modified: datetime
deleted: bool
def to_json(self):
return {
"id": self.id,
"name": self.name,
"lock": _coerce_bool(self.lock),
"owner_id": self.owner_id,
"authority_id": self.authority_id,
"port_id": self.port_id,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
"deleted": _coerce_bool(self.deleted),
}
@dataclass
class Port(Schema):
id: int
@ -76,16 +45,6 @@ class Port(Schema):
modified: datetime
deleted: bool
def to_json(self):
return {
"id": self.id,
"name": self.name,
"locode": self.locode,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
"deleted": _coerce_bool(self.deleted),
}
class OperationType(IntEnum):
undefined = 0
insert = 1
@ -189,8 +148,8 @@ class History:
"id": self.id,
"participant_id": self.participant_id,
"shipcall_id": self.shipcall_id,
"timestamp": _format_datetime(self.timestamp),
"eta": _format_datetime(self.eta),
"timestamp": self.timestamp.isoformat() if self.timestamp else "",
"eta": self.eta.isoformat() if self.eta else "",
"type": self.type.name if isinstance(self.type, IntEnum) else ObjectType(self.type).name,
"operation": self.operation.name if isinstance(self.operation, IntEnum) else OperationType(self.operation).name
}
@ -231,8 +190,8 @@ class Notification:
"level": self.level,
"type": self.type.name if isinstance(self.type, IntEnum) else NotificationType(self.type).name,
"message": self.message,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified)
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else ""
}
@classmethod
@ -253,21 +212,6 @@ class Participant(Schema):
deleted: bool
ports: List[int] = field(default_factory=list)
def to_json(self):
return {
"id": self.id,
"name": self.name,
"street": self.street,
"postal_code": self.postal_code,
"city": self.city,
"type": self.type,
"flags": self.flags,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
"deleted": _coerce_bool(self.deleted),
"ports": self.ports,
}
@validates("type")
def validate_type(self, value, **kwargs):
# e.g., when an IntFlag has the values 1,2,4; the maximum valid value is 7
@ -299,7 +243,7 @@ class ParticipantAssignmentSchema(Schema):
class ShipcallSchema(Schema):
def __init__(self):
super().__init__(unknown=EXCLUDE)
super().__init__(unknown=None)
pass
id = fields.Integer(required=True)
@ -421,34 +365,34 @@ class Shipcall:
"id": self.id,
"ship_id": self.ship_id,
"type": self.type.name if isinstance(self.type, IntEnum) else ShipcallType(self.type).name,
"eta": _format_datetime(self.eta),
"eta": self.eta.isoformat() if self.eta else "",
"voyage": self.voyage,
"etd": _format_datetime(self.etd),
"etd": self.etd.isoformat() if self.etd else "",
"arrival_berth_id": self.arrival_berth_id,
"departure_berth_id": self.departure_berth_id,
"tug_required": _coerce_bool(self.tug_required),
"pilot_required": _coerce_bool(self.pilot_required),
"tug_required": self.tug_required,
"pilot_required": self.pilot_required,
"flags": self.flags,
"pier_side": _coerce_bool(self.pier_side),
"bunkering": _coerce_bool(self.bunkering),
"replenishing_terminal": _coerce_bool(self.replenishing_terminal),
"replenishing_lock": _coerce_bool(self.replenishing_lock),
"pier_side": self.pier_side,
"bunkering": self.bunkering,
"replenishing_terminal": self.replenishing_terminal,
"replenishing_lock": self.replenishing_lock,
"draft": self.draft,
"tidal_window_from": _format_datetime(self.tidal_window_from),
"tidal_window_to": _format_datetime(self.tidal_window_to),
"rain_sensitive_cargo": _coerce_bool(self.rain_sensitive_cargo),
"tidal_window_from": self.tidal_window_from.isoformat() if self.tidal_window_from else "",
"tidal_window_to": self.tidal_window_to.isoformat() if self.tidal_window_to else "",
"rain_sensitive_cargo": self.rain_sensitive_cargo,
"recommended_tugs": self.recommended_tugs,
"anchored": _coerce_bool(self.anchored),
"moored_lock": _coerce_bool(self.moored_lock),
"canceled": _coerce_bool(self.canceled),
"anchored": self.anchored,
"moored_lock": self.moored_lock,
"canceled": self.canceled,
"evaluation": self.evaluation.name if isinstance(self.evaluation, IntEnum) else EvaluationType(self.evaluation).name,
"evaluation_message": self.evaluation_message,
"evaluation_time": _format_datetime(self.evaluation_time),
"evaluation_notifications_sent": _coerce_bool(self.evaluation_notifications_sent),
"evaluation_time": self.evaluation_time.isoformat() if self.evaluation_time else "",
"evaluation_notifications_sent": self.evaluation_notifications_sent,
"time_ref_point": self.time_ref_point,
"port_id": self.port_id,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
"participants": [participant.__dict__ for participant in self.participants]
}
@ -456,39 +400,7 @@ class Shipcall:
@classmethod
def from_query_row(self, id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified):
return self(
id,
ship_id,
ShipcallType(type),
eta,
voyage,
etd,
arrival_berth_id,
departure_berth_id,
_coerce_bool(tug_required),
_coerce_bool(pilot_required),
flags,
_coerce_bool(pier_side),
_coerce_bool(bunkering),
_coerce_bool(replenishing_terminal),
_coerce_bool(replenishing_lock),
draft,
tidal_window_from,
tidal_window_to,
_coerce_bool(rain_sensitive_cargo),
recommended_tugs,
_coerce_bool(anchored),
_coerce_bool(moored_lock),
_coerce_bool(canceled),
EvaluationType(evaluation),
evaluation_message,
evaluation_time,
_coerce_bool(evaluation_notifications_sent),
time_ref_point,
port_id,
created,
modified
)
return self(id, ship_id, ShipcallType(type), eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, EvaluationType(evaluation), evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified)
class ShipcallId(Schema):
pass
@ -603,9 +515,9 @@ class UserSchema(Schema):
super().__init__(unknown=None)
pass
id = fields.Integer(required=True)
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
user_phone = fields.String(allow_none=True, required=False, validate=[validate.Length(max=32)])
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
user_phone = fields.String(allow_none=True, required=False)
user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
old_password = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
new_password = fields.String(allow_none=True, required=False, validate=[validate.Length(min=6, max=128)])
@ -615,14 +527,6 @@ class UserSchema(Schema):
notify_popup = fields.Bool(allow_none=True, required=False)
notify_on = fields.List(fields.Enum(NotificationType), required=False, allow_none=True)
@pre_load
def validate_bool_types(self, data, **kwargs):
bool_fields = ["notify_email", "notify_whatsapp", "notify_signal", "notify_popup"]
for field_name in bool_fields:
if field_name in data and data[field_name] is not None and not isinstance(data[field_name], bool):
raise ValidationError({field_name: "must be a boolean"})
return data
@validates("user_phone")
def validate_user_phone(self, value, **kwargs):
if value is not None:
@ -663,34 +567,6 @@ class Times:
created: datetime
modified: datetime
def to_json(self):
return {
"id": self.id,
"eta_berth": _format_datetime(self.eta_berth),
"eta_berth_fixed": _coerce_bool(self.eta_berth_fixed),
"etd_berth": _format_datetime(self.etd_berth),
"etd_berth_fixed": _coerce_bool(self.etd_berth_fixed),
"lock_time": _format_datetime(self.lock_time),
"lock_time_fixed": _coerce_bool(self.lock_time_fixed),
"zone_entry": _format_datetime(self.zone_entry),
"zone_entry_fixed": _coerce_bool(self.zone_entry_fixed),
"operations_start": _format_datetime(self.operations_start),
"operations_end": _format_datetime(self.operations_end),
"remarks": self.remarks,
"participant_id": self.participant_id,
"berth_id": self.berth_id,
"berth_info": self.berth_info,
"pier_side": _coerce_bool(self.pier_side),
"participant_type": self.participant_type,
"shipcall_id": self.shipcall_id,
"ata": _format_datetime(self.ata),
"atd": _format_datetime(self.atd),
"eta_interval_end": _format_datetime(self.eta_interval_end),
"etd_interval_end": _format_datetime(self.etd_interval_end),
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
}
@dataclass
class User:
@ -734,23 +610,6 @@ class Ship:
modified: datetime
deleted: bool
def to_json(self):
return {
"id": self.id,
"name": self.name if self.name is not None else "",
"imo": self.imo,
"callsign": self.callsign,
"participant_id": self.participant_id,
"length": self.length,
"width": self.width,
"is_tug": _coerce_bool(self.is_tug),
"bollard_pull": self.bollard_pull,
"eni": self.eni,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
"deleted": _coerce_bool(self.deleted),
}
class ShipSchema(Schema):
def __init__(self):
@ -771,14 +630,6 @@ class ShipSchema(Schema):
modified = fields.DateTime(allow_none=True, required=False)
deleted = fields.Bool(allow_none=True, required=False, load_default=False, dump_default=False)
@pre_load
def validate_bool_types(self, data, **kwargs):
bool_fields = ["is_tug", "deleted"]
for field_name in bool_fields:
if field_name in data and data[field_name] is not None and not isinstance(data[field_name], bool):
raise ValidationError({field_name: "must be a boolean"})
return data
@validates("name")
def validate_name(self, value, **kwargs):
character_length = len(str(value))
@ -845,6 +696,6 @@ class ShipcallParticipantMap:
"shipcall_id": self.shipcall_id,
"participant_id": self.participant_id,
"type": self.type.name if isinstance(self.type, IntEnum) else ShipcallType(self.type).name,
"created": _format_datetime(self.created),
"modified": _format_datetime(self.modified),
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
}

View File

@ -1,4 +1,5 @@
from flask import request, jsonify
import json
from flask import request
from .jwt_handler import decode_jwt
def check_jwt():
@ -24,11 +25,11 @@ def auth_guard(role=None):
try:
user_data = check_jwt()
except Exception as e:
return jsonify({"error_field" : f'{e}', "status": 401}), 401
return json.dumps({"error_field" : f'{e}', "status": 401}), 401
if role and role not in user_data['roles']:
return jsonify({"error_field": 'Authorization required.', "status" : 403}), 403
return json.dumps({"error_field": 'Authorization required.', "status" : 403}), 403
# get on to original route
return route_function(*args, **kwargs)
decorated_function.__name__ = route_function.__name__
return decorated_function
return wrapper
return wrapper

View File

@ -3,7 +3,6 @@ import pydapper
import smtplib
import json
import os
import base64
from email.message import EmailMessage
from BreCal.schemas import model, defs
@ -111,60 +110,12 @@ def SendEmails(email_dict):
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
def _parse_bool(value) -> bool:
if isinstance(value, bool):
return value
if value is None:
return False
return str(value).strip().lower() in ("1", "true", "yes", "y", "on")
def _smtp_auth_ntlm(connection, username: str, password: str, domain: str | None = None, workstation: str | None = None):
try:
from ntlm_auth.ntlm import NtlmContext
except Exception as exc:
raise RuntimeError("NTLM auth requested but ntlm-auth is not installed") from exc
context = NtlmContext(username, password, domain=domain, workstation=workstation)
negotiate = context.step()
code, challenge = connection.docmd("AUTH", "NTLM " + base64.b64encode(negotiate).decode("ascii"))
if code != 334:
raise smtplib.SMTPException(f"NTLM negotiate failed: {code} {challenge}")
challenge_bytes = base64.b64decode(challenge.strip())
authenticate = context.step(challenge_bytes)
code, msg = connection.docmd(base64.b64encode(authenticate).decode("ascii"))
if code != 235:
raise smtplib.SMTPAuthenticationError(code, msg)
encryption = defs.email_credentials.get("encryption") or defs.email_credentials.get("encryption_method") or "STARTTLS"
encryption_norm = str(encryption).strip().upper().replace(" ", "").replace("_", "").replace("-", "")
use_ntlm_auth = _parse_bool(defs.email_credentials.get("USE_NTLM_AUTH"))
if encryption_norm in ("SSLTLS", "SSL", "TLS"):
conn = smtplib.SMTP_SSL(defs.email_credentials["server"], defs.email_credentials["port"])
else:
conn = smtplib.SMTP(defs.email_credentials["server"], defs.email_credentials["port"])
try:
smtp_debug_level = int(defs.SMTP_DEBUG_LEVEL)
except (TypeError, ValueError):
smtp_debug_level = 0
conn.set_debuglevel(1 if smtp_debug_level else 0)
conn = smtplib.SMTP(defs.email_credentials["server"], defs.email_credentials["port"])
conn.set_debuglevel(1) # set this to 0 to disable debug output to log
conn.ehlo()
if encryption_norm in ("STARTTLS", "STARTSSL"):
conn.starttls()
conn.ehlo()
elif encryption_norm in ("NONE", "NO", "DISABLE"):
pass
elif encryption_norm not in ("SSLTLS", "SSL", "TLS"):
logging.warning("Unknown email encryption '%s'; defaulting to STARTTLS.", encryption)
conn.starttls()
conn.ehlo()
auth_username = defs.email_credentials.get("auth_username") or defs.email_credentials.get("sender")
if use_ntlm_auth:
ntlm_user = defs.email_credentials.get("ntlm_user") or auth_username
ntlm_domain = defs.email_credentials.get("ntlm_domain")
ntlm_workstation = defs.email_credentials.get("ntlm_workstation")
_smtp_auth_ntlm(conn, ntlm_user, defs.email_credentials["password_send"], domain=ntlm_domain, workstation=ntlm_workstation)
else:
conn.login(auth_username, defs.email_credentials["password_send"])
conn.starttls()
conn.ehlo()
conn.login(defs.email_credentials["sender"], defs.email_credentials["password_send"])
current_path = os.path.dirname(os.path.abspath(__file__))
@ -310,13 +261,6 @@ def SendNotifications():
pass
# mark as sent
logging.info(
"Notification finalized (level=2): id=%s shipcall_id=%s participant_id=%s type=%s",
notification.id,
notification.shipcall_id,
notification.participant_id,
notification.type,
)
commands.execute("UPDATE notification SET level = 2 WHERE id = ?id?", param={"id":notification.id})
# send emails (if any)
@ -382,7 +326,7 @@ def eval_next_24_hrs():
return
def setup_schedule(update_shipcalls_interval_in_minutes:int=60, notification_cooldown_mins:int=defs.NOTIFICATION_COOLDOWN_MINS):
def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
logging.getLogger('schedule').setLevel(logging.INFO); # set the logging level of the schedule module to INFO
@ -391,7 +335,7 @@ def setup_schedule(update_shipcalls_interval_in_minutes:int=60, notification_coo
# update the evaluation state in every recent shipcall
add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes)
add_function_to_evaluate_notifications(notification_cooldown_mins)
add_function_to_evaluate_notifications(defs.NOTIFICATION_COOLDOWN_MINS)
add_function_to_clear_notifications(defs.NOTIFICATION_MAX_AGE_DAYS)

View File

@ -177,8 +177,6 @@ class InputValidationTimes(InputValidationBase):
# validates the times dataset.
# ensure loadedModel["participant_type"] is of type ParticipantType
if loadedModel.get("participant_type") is None:
raise ValidationError({"participant_type": "participant_type is required"})
if not isinstance(loadedModel["participant_type"], ParticipantType):
loadedModel["participant_type"] = ParticipantType(loadedModel["participant_type"])
@ -231,10 +229,8 @@ class InputValidationTimes(InputValidationBase):
The dependent and independent fields are validated by checking, whether the respective value in 'content'
is undefined (returns None). When any of these fields is undefined, a ValidationError is raised.
"""
participant_type = loadedModel.get("participant_type")
shipcall_id = loadedModel.get("shipcall_id")
if shipcall_id is None:
raise ValidationError({"shipcall_id": "shipcall_id is required"})
participant_type = loadedModel["participant_type"]
shipcall_id = loadedModel["shipcall_id"]
# build a dictionary of id:item pairs, so one can select the respective participant
# must look-up the shipcall_type based on the shipcall_id
@ -323,9 +319,7 @@ class InputValidationTimes(InputValidationBase):
"""
### TIMES DATASET (ShipcallParticipantMap) ###
# identify shipcall_id
shipcall_id = loadedModel.get("shipcall_id")
if loadedModel.get("participant_type") is None:
raise ValidationError({"participant_type": "participant_type is required"})
shipcall_id = loadedModel["shipcall_id"]
DATASET_participant_type = ParticipantType(loadedModel["participant_type"]) if not isinstance(loadedModel["participant_type"],ParticipantType) else loadedModel["participant_type"]
# get ShipcallParticipantMap for the shipcall_id
@ -440,8 +434,6 @@ class InputValidationTimes(InputValidationBase):
# get the matching entry from the shipcall participant map, where the role matches. Raise an error, when there is no match.
assigned_agency = get_assigned_participant_of_type(shipcall_id, participant_type=ParticipantType.AGENCY)
if assigned_agency is None:
raise ValidationError({"participant_type": "the assigned agency for this shipcall could not be resolved."})
# a) the user has the participant ID of the assigned entry for a given role
user_is_assigned_role = user_participant_id == times_assigned_participant.id

View File

@ -2,7 +2,6 @@ import logging
import typing
import json
import sys
from flask import jsonify
from marshmallow import ValidationError
from werkzeug.exceptions import Forbidden
@ -30,7 +29,7 @@ def unbundle_(errors, unbundled=[]):
[{'error_field':'keya', 'error_description':['keyb']}, {'error_field':'keyc12', 'error_description':[12]}]
As can be seen, only the subkeys and their respective value are received. Each value is *always* a list.
"""
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description": str(v[0] if isinstance(v,list) else v)}) for k,v in errors.items()}
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description":v[0] if isinstance(v,list) else str(v)}) for k,v in errors.items()}
return
def unbundle_validation_error_message(message):
@ -41,8 +40,8 @@ def unbundle_validation_error_message(message):
unbundled = []
unbundle_(message, unbundled=unbundled)
if len(unbundled)>0:
error_field = "ValidationError in the following field(s): " + " & ".join([str(unb["error_field"]) for unb in unbundled])
error_description = " " . join([str(unb["error_description"]) for unb in unbundled])
error_field = "ValidationError in the following field(s): " + " & ".join([unb["error_field"] for unb in unbundled])
error_description = " " . join([unb["error_description"] for unb in unbundled])
else:
error_field = "ValidationError"
error_description = "unknown validation error"
@ -54,28 +53,35 @@ def create_validation_error_response(ex:ValidationError, status_code:int=400, cr
(error_field, error_description) = unbundle_validation_error_message(message)
json_response = create_default_json_response_format(error_field=error_field, error_description=error_description)
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
# natively serializable are properly serialized.
serialized_response = json.dumps(json_response, default=str)
if create_log:
logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message)
return (jsonify(json_response), status_code)
return (serialized_response, status_code)
def create_werkzeug_error_response(ex:Forbidden, status_code:int=403, create_log:bool=True)->typing.Tuple[str,int]:
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
# natively serializable are properly serialized.
message = ex.description
json_response = create_default_json_response_format(error_field=str(repr(ex)), error_description=message)
serialized_response = json.dumps(json_response, default=str)
if create_log:
logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message)
return jsonify(json_response), status_code
return serialized_response, status_code
def create_dynamic_exception_response(ex, status_code:int=400, message:typing.Optional[str]=None, create_log:bool=True):
message = repr(ex) if message is None else message
json_response = create_default_json_response_format(error_field="Exception", error_description=message)
json_response["error_field"] = "call failed"
serialized_response = json.dumps(json_response, default=str)
if create_log:
logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message)
return (jsonify(json_response), status_code)
return (serialized_response, status_code)

View File

@ -135,22 +135,15 @@ class ValidationRules(ValidationRuleFunctions):
for participant in participants:
commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"], "message" : violation})
resolves_time_conflict = (state_old == StatusFlags.RED.value) and (state_new in [StatusFlags.GREEN.value, StatusFlags.YELLOW.value])
if resolves_time_conflict:
notification_type = 3 # time_conflict
if state_new == 1 and state_old != 0: # this resolves the time conflict
logging.info(f"Resolving notifications for shipcall {shipcall_id}, type={notification_type}")
query = "SELECT COUNT(*) as cnt FROM notification WHERE shipcall_id = ?shipcall_id? AND type = ?type?"
result = commands.query(query, model=dict, param={"shipcall_id" : int(shipcall_id), "type" : notification_type})
has_conflict_notification = (len(result) > 0) and (result[0].get("cnt", 0) > 0)
if has_conflict_notification:
query = "DELETE from notification where shipcall_id = ?shipcall_id? and type = ?type? and level = 0"
deleted_count = commands.execute(query, param={"shipcall_id" : int(shipcall_id), "type" : notification_type})
logging.info(f"Deleted {deleted_count} existing notifications (yet unsent)")
query = f"DELETE from notification where shipcall_id = ?shipcall_id? and type = {notification_type} and level = 0"
deleted_count = commands.execute(query, param={"shipcall_id" : int(shipcall_id)})
logging.info(f"Deleted {deleted_count} existing notifications (yet unsent)")
if deleted_count == 0:
query = "INSERT INTO notification (shipcall_id, participant_id, type, level) VALUES (?shipcall_id?, ?participant_id?, 4, 0)"
for participant in participants:
commands.execute(query, param={"shipcall_id" : int(shipcall_id), "participant_id" : participant["participant_id"]})
else:
logging.info(f"Skipping resolve notification for shipcall {shipcall_id} because no prior time_conflict exists")
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -7,7 +7,7 @@ def base_url() -> str:
# Example: https://dev.api.mycompany.com
url = os.environ.get("API_BASE_URL")
if not url:
url = "http://127.0.0.1:5000"
url = "http://neptun.fritz.box"
# raise RuntimeError("Set API_BASE_URL")
return url.rstrip("/")

View File

@ -1,16 +1,6 @@
import os
import pytest
import schemathesis
from schemathesis import checks
schema = schemathesis.openapi.from_path(
"../../../misc/BreCalApi.yaml",
)
schema.base_url = (os.environ.get("API_BASE_URL") or "http://127.0.0.1:5000").rstrip("/")
@pytest.fixture(scope="session", autouse=True)
def _set_schema_base_url(base_url: str) -> None:
schema.base_url = base_url
schema = schemathesis.openapi.from_path("../../../misc/BreCalApi.yaml")
@schema.parametrize()
def test_api_conformance(
@ -19,18 +9,10 @@ def test_api_conformance(
auth_headers: dict[str, str],
login_payload: dict[str, str],
) -> None:
case.operation.schema.base_url = base_url
transport = getattr(case.operation.schema, "transport", None)
if transport is not None and hasattr(transport, "base_url"):
transport.base_url = base_url
# Calls your real service:
if case.path == "/login" and case.method.upper() == "POST":
response = case.call(base_url=base_url)
response = case.call(base_url=base_url, json=login_payload)
else:
response = case.call(base_url=base_url, headers=auth_headers)
checks.load_all_checks()
custom_checks = [c for c in checks.CHECKS.get_all() if c.__name__ != "ignored_auth"]
# Validates status code, headers, and body against the OpenAPI schema:
case.validate_response(response, checks=custom_checks)
case.validate_response(response)