Compare commits

..

2 Commits

19 changed files with 246 additions and 71 deletions

2
.gitignore vendored
View File

@ -289,3 +289,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
src/notebooks_metz/
src/server/editable_requirements.txt
schemathesis_report.html

View File

@ -95,6 +95,14 @@ paths:
$ref: '#/components/responses/400'
'401':
$ref: '#/components/responses/401'
'404':
description: Not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error_field: no such record
'500':
$ref: '#/components/responses/500'
'503':
@ -800,10 +808,14 @@ components:
properties:
username:
type: string
minLength: 1
pattern: "\\S"
example: alfred
password:
type: string
format: password
minLength: 1
pattern: "\\S"
example: '123456'
required:
- username
@ -1701,19 +1713,25 @@ components:
example: 5
first_name:
type: string
maxLength: 45
example: John
last_name:
type: string
maxLength: 45
example: Doe
user_name:
type: string
maxLength: 45
example: johndoe
user_phone:
type: string
nullable: true
maxLength: 32
example: '1234567890'
user_email:
type: string
format: email
maxLength: 64
nullable: true
example: no@where.com
notify_email:

View File

@ -19,3 +19,9 @@ Das Ganze funktioniert nur, wenn auch schemathesis und hypothesis in den passend
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
```pip install "hypothesis==6.120.0"```
Das muss wegen dependencies so blöd gepinnt werden.
Damit pytest die API findet muss API_BASE_URL als Umgebungsvariable gesetzt werden. In Powershell z.B. so:
```powershell
$env:API_BASE_URL = "http://localhost:5000"
```

View File

@ -1,4 +1,4 @@
from flask import Blueprint, request
from flask import Blueprint, request, jsonify
from flask_jwt_extended import create_access_token
from webargs.flaskparser import parser
from ..schemas import model
@ -13,4 +13,14 @@ bp = Blueprint('login', __name__)
def Logon():
options = request.get_json(force=True)
if not isinstance(options, dict):
return jsonify({"error_field": "invalid request body"}), 400
username = options.get("username")
password = options.get("password")
if not username or not password:
return jsonify({"error_field": "username and password required"}), 400
if not isinstance(username, str) or not isinstance(password, str):
return jsonify({"error_field": "username and password must be strings"}), 400
if not username.strip() or not password.strip():
return jsonify({"error_field": "username and password cannot be empty"}), 400
return impl.login.GetUser(options)

View File

@ -1,6 +1,7 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -33,7 +34,7 @@ def GetBerths(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500
return jsonify(result), 500
finally:
if pooledConnection is not None:

View File

@ -2,6 +2,7 @@ import json
import logging
import pydapper
import pdb
from flask import jsonify
from ..schemas import model
from ..schemas.model import History
@ -34,7 +35,7 @@ def GetHistory(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps("call failed"), 500
return jsonify("call failed"), 500
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -1,7 +1,7 @@
import json
import logging
import pydapper
import bcrypt
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -35,24 +35,24 @@ def GetUser(options):
"user_name": data[0].user_name,
"user_phone": data[0].user_phone,
"user_email": data[0].user_email,
"notify_email": data[0].notify_email,
"notify_whatsapp": data[0].notify_whatsapp,
"notify_signal": data[0].notify_signal,
"notify_popup": data[0].notify_popup,
"notify_email": model._coerce_bool(data[0].notify_email),
"notify_whatsapp": model._coerce_bool(data[0].notify_whatsapp),
"notify_signal": model._coerce_bool(data[0].notify_signal),
"notify_popup": model._coerce_bool(data[0].notify_popup),
"notify_on": model.notification_types_to_names(model.bitflag_to_list(data[0].notify_event))
}
token = jwt_handler.generate_jwt(payload=result, lifetime=120) # generate token valid 60 mins
result["token"] = token # add token to user data
return json.dumps(result), 200, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 200
if len(data) > 1:
result = {}
result["error_field"] = "credential lookup mismatch"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
result = {}
result["error_field"] = "invalid credentials"
return json.dumps(result), 403, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 403
except Exception as ex:
logging.error(ex)
@ -60,7 +60,7 @@ def GetUser(options):
result = {}
result["error_field"] = "call failed"
result["error_description"] = str(ex)
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:

View File

@ -1,6 +1,7 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -28,7 +29,7 @@ def GetNotifications(token, participant_id=None):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -1,6 +1,7 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -59,7 +60,7 @@ def GetParticipant(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps("call failed"), 500
return jsonify("call failed"), 500
finally:
if pooledConnection is not None:

View File

@ -1,6 +1,7 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -23,7 +24,7 @@ def GetPorts(token):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500
return jsonify(result), 500
finally:
if pooledConnection is not None:

View File

@ -2,6 +2,7 @@ import json
import logging
import traceback
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -44,7 +45,7 @@ def GetShipcalls(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
@ -178,7 +179,7 @@ def PostShipcalls(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
@ -205,7 +206,7 @@ def PutShipcalls(schemaModel, original_payload=None):
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify("no such record"), 404
was_canceled = theshipcall["canceled"]
@ -325,7 +326,7 @@ def PutShipcalls(schemaModel, original_payload=None):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:

View File

@ -1,6 +1,7 @@
import json
import logging
import pydapper
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -26,7 +27,7 @@ def GetShips(token):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
@ -90,7 +91,7 @@ def PostShip(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
pooledConnection.close()
@ -133,7 +134,7 @@ def PutShip(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
pooledConnection.close()
@ -158,14 +159,14 @@ def DeleteShip(options):
result = {}
result["error_field"] = "no such record"
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 404
except Exception as ex:
logging.error(ex)
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
pooledConnection.close()

View File

@ -2,6 +2,7 @@ import json
import logging
import traceback
import pydapper
from flask import jsonify
from enum import Enum, Flag
from ..schemas import model
@ -35,7 +36,7 @@ def GetTimes(options):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
@ -108,7 +109,7 @@ def PostTimes(schemaModel):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
@ -130,7 +131,7 @@ def PutTimes(schemaModel, original_payload=None):
sentinel = object()
existing_times = commands.query_single_or_default("SELECT * FROM times WHERE id = ?id?", sentinel, param={"id": schemaModel["id"]})
if existing_times is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify("no such record"), 404
provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None
@ -177,7 +178,7 @@ def PutTimes(schemaModel, original_payload=None):
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:
@ -208,14 +209,14 @@ def DeleteTimes(options):
result = {}
result["error_field"] = "no such record"
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 404
except Exception as ex:
logging.error(ex)
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:

View File

@ -2,6 +2,7 @@ import json
import logging
import pydapper
import bcrypt
from flask import jsonify
from ..schemas import model
from .. import local_db
@ -27,7 +28,7 @@ def PutUser(schemaModel):
theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
# #TODO: result = {"message":"no such record"} -> json.dumps
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify({"error_field": "no such record"}), 404
# see if we need to update public fields
# #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields
@ -70,16 +71,16 @@ def PutUser(schemaModel):
else:
result = {}
result["error_field"] = "old password invalid"
return json.dumps(result), 400, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 400
return json.dumps({"id" : schemaModel["id"]}), 200
return jsonify({"id" : schemaModel["id"]}), 200
except Exception as ex:
logging.error(ex)
print(ex)
result = {}
result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
return jsonify(result), 500
finally:
if pooledConnection is not None:

View File

@ -24,6 +24,15 @@ def obj_dict(obj):
return obj.to_json()
return obj.__dict__
def _coerce_bool(value):
if value is None:
return None
if isinstance(value, bool):
return value
if isinstance(value, int) and value in (0, 1):
return bool(value)
return value
@dataclass
class Berth(Schema):
id: int
@ -36,6 +45,19 @@ class Berth(Schema):
modified: datetime
deleted: bool
def to_json(self):
return {
"id": self.id,
"name": self.name,
"lock": _coerce_bool(self.lock),
"owner_id": self.owner_id,
"authority_id": self.authority_id,
"port_id": self.port_id,
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
"deleted": _coerce_bool(self.deleted),
}
@dataclass
class Port(Schema):
id: int
@ -45,6 +67,16 @@ class Port(Schema):
modified: datetime
deleted: bool
def to_json(self):
return {
"id": self.id,
"name": self.name,
"locode": self.locode,
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
"deleted": _coerce_bool(self.deleted),
}
class OperationType(IntEnum):
undefined = 0
insert = 1
@ -212,6 +244,21 @@ class Participant(Schema):
deleted: bool
ports: List[int] = field(default_factory=list)
def to_json(self):
return {
"id": self.id,
"name": self.name,
"street": self.street,
"postal_code": self.postal_code,
"city": self.city,
"type": self.type,
"flags": self.flags,
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
"deleted": _coerce_bool(self.deleted),
"ports": self.ports,
}
@validates("type")
def validate_type(self, value, **kwargs):
# e.g., when an IntFlag has the values 1,2,4; the maximum valid value is 7
@ -370,25 +417,25 @@ class Shipcall:
"etd": self.etd.isoformat() if self.etd else "",
"arrival_berth_id": self.arrival_berth_id,
"departure_berth_id": self.departure_berth_id,
"tug_required": self.tug_required,
"pilot_required": self.pilot_required,
"tug_required": _coerce_bool(self.tug_required),
"pilot_required": _coerce_bool(self.pilot_required),
"flags": self.flags,
"pier_side": self.pier_side,
"bunkering": self.bunkering,
"replenishing_terminal": self.replenishing_terminal,
"replenishing_lock": self.replenishing_lock,
"pier_side": _coerce_bool(self.pier_side),
"bunkering": _coerce_bool(self.bunkering),
"replenishing_terminal": _coerce_bool(self.replenishing_terminal),
"replenishing_lock": _coerce_bool(self.replenishing_lock),
"draft": self.draft,
"tidal_window_from": self.tidal_window_from.isoformat() if self.tidal_window_from else "",
"tidal_window_to": self.tidal_window_to.isoformat() if self.tidal_window_to else "",
"rain_sensitive_cargo": self.rain_sensitive_cargo,
"rain_sensitive_cargo": _coerce_bool(self.rain_sensitive_cargo),
"recommended_tugs": self.recommended_tugs,
"anchored": self.anchored,
"moored_lock": self.moored_lock,
"canceled": self.canceled,
"anchored": _coerce_bool(self.anchored),
"moored_lock": _coerce_bool(self.moored_lock),
"canceled": _coerce_bool(self.canceled),
"evaluation": self.evaluation.name if isinstance(self.evaluation, IntEnum) else EvaluationType(self.evaluation).name,
"evaluation_message": self.evaluation_message,
"evaluation_time": self.evaluation_time.isoformat() if self.evaluation_time else "",
"evaluation_notifications_sent": self.evaluation_notifications_sent,
"evaluation_notifications_sent": _coerce_bool(self.evaluation_notifications_sent),
"time_ref_point": self.time_ref_point,
"port_id": self.port_id,
"created": self.created.isoformat() if self.created else "",
@ -400,7 +447,39 @@ class Shipcall:
@classmethod
def from_query_row(self, id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified):
return self(id, ship_id, ShipcallType(type), eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, EvaluationType(evaluation), evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified)
return self(
id,
ship_id,
ShipcallType(type),
eta,
voyage,
etd,
arrival_berth_id,
departure_berth_id,
_coerce_bool(tug_required),
_coerce_bool(pilot_required),
flags,
_coerce_bool(pier_side),
_coerce_bool(bunkering),
_coerce_bool(replenishing_terminal),
_coerce_bool(replenishing_lock),
draft,
tidal_window_from,
tidal_window_to,
_coerce_bool(rain_sensitive_cargo),
recommended_tugs,
_coerce_bool(anchored),
_coerce_bool(moored_lock),
_coerce_bool(canceled),
EvaluationType(evaluation),
evaluation_message,
evaluation_time,
_coerce_bool(evaluation_notifications_sent),
time_ref_point,
port_id,
created,
modified
)
class ShipcallId(Schema):
pass
@ -515,10 +594,10 @@ class UserSchema(Schema):
super().__init__(unknown=None)
pass
id = fields.Integer(required=True)
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
user_phone = fields.String(allow_none=True, required=False)
user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
user_phone = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
old_password = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
new_password = fields.String(allow_none=True, required=False, validate=[validate.Length(min=6, max=128)])
notify_email = fields.Bool(allow_none=True, required=False)
@ -567,6 +646,34 @@ class Times:
created: datetime
modified: datetime
def to_json(self):
return {
"id": self.id,
"eta_berth": self.eta_berth.isoformat() if self.eta_berth else "",
"eta_berth_fixed": _coerce_bool(self.eta_berth_fixed),
"etd_berth": self.etd_berth.isoformat() if self.etd_berth else "",
"etd_berth_fixed": _coerce_bool(self.etd_berth_fixed),
"lock_time": self.lock_time.isoformat() if self.lock_time else "",
"lock_time_fixed": _coerce_bool(self.lock_time_fixed),
"zone_entry": self.zone_entry.isoformat() if self.zone_entry else "",
"zone_entry_fixed": _coerce_bool(self.zone_entry_fixed),
"operations_start": self.operations_start.isoformat() if self.operations_start else "",
"operations_end": self.operations_end.isoformat() if self.operations_end else "",
"remarks": self.remarks,
"participant_id": self.participant_id,
"berth_id": self.berth_id,
"berth_info": self.berth_info,
"pier_side": _coerce_bool(self.pier_side),
"participant_type": self.participant_type,
"shipcall_id": self.shipcall_id,
"ata": self.ata.isoformat() if self.ata else "",
"atd": self.atd.isoformat() if self.atd else "",
"eta_interval_end": self.eta_interval_end.isoformat() if self.eta_interval_end else "",
"etd_interval_end": self.etd_interval_end.isoformat() if self.etd_interval_end else "",
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
}
@dataclass
class User:
@ -610,6 +717,23 @@ class Ship:
modified: datetime
deleted: bool
def to_json(self):
return {
"id": self.id,
"name": self.name,
"imo": self.imo,
"callsign": self.callsign,
"participant_id": self.participant_id,
"length": self.length,
"width": self.width,
"is_tug": _coerce_bool(self.is_tug),
"bollard_pull": self.bollard_pull,
"eni": self.eni,
"created": self.created.isoformat() if self.created else "",
"modified": self.modified.isoformat() if self.modified else "",
"deleted": _coerce_bool(self.deleted),
}
class ShipSchema(Schema):
def __init__(self):

View File

@ -1,5 +1,4 @@
import json
from flask import request
from flask import request, jsonify
from .jwt_handler import decode_jwt
def check_jwt():
@ -25,11 +24,11 @@ def auth_guard(role=None):
try:
user_data = check_jwt()
except Exception as e:
return json.dumps({"error_field" : f'{e}', "status": 401}), 401
return jsonify({"error_field" : f'{e}', "status": 401}), 401
if role and role not in user_data['roles']:
return json.dumps({"error_field": 'Authorization required.', "status" : 403}), 403
return jsonify({"error_field": 'Authorization required.', "status" : 403}), 403
# get on to original route
return route_function(*args, **kwargs)
decorated_function.__name__ = route_function.__name__
return decorated_function
return wrapper
return wrapper

View File

@ -2,6 +2,7 @@ import logging
import typing
import json
import sys
from flask import jsonify
from marshmallow import ValidationError
from werkzeug.exceptions import Forbidden
@ -29,7 +30,7 @@ def unbundle_(errors, unbundled=[]):
[{'error_field':'keya', 'error_description':['keyb']}, {'error_field':'keyc12', 'error_description':[12]}]
As can be seen, only the subkeys and their respective value are received. Each value is *always* a list.
"""
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description":v[0] if isinstance(v,list) else str(v)}) for k,v in errors.items()}
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description": str(v[0] if isinstance(v,list) else v)}) for k,v in errors.items()}
return
def unbundle_validation_error_message(message):
@ -40,8 +41,8 @@ def unbundle_validation_error_message(message):
unbundled = []
unbundle_(message, unbundled=unbundled)
if len(unbundled)>0:
error_field = "ValidationError in the following field(s): " + " & ".join([unb["error_field"] for unb in unbundled])
error_description = " " . join([unb["error_description"] for unb in unbundled])
error_field = "ValidationError in the following field(s): " + " & ".join([str(unb["error_field"]) for unb in unbundled])
error_description = " " . join([str(unb["error_description"]) for unb in unbundled])
else:
error_field = "ValidationError"
error_description = "unknown validation error"
@ -53,35 +54,28 @@ def create_validation_error_response(ex:ValidationError, status_code:int=400, cr
(error_field, error_description) = unbundle_validation_error_message(message)
json_response = create_default_json_response_format(error_field=error_field, error_description=error_description)
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
# natively serializable are properly serialized.
serialized_response = json.dumps(json_response, default=str)
if create_log:
logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message)
return (serialized_response, status_code)
return (jsonify(json_response), status_code)
def create_werkzeug_error_response(ex:Forbidden, status_code:int=403, create_log:bool=True)->typing.Tuple[str,int]:
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
# natively serializable are properly serialized.
message = ex.description
json_response = create_default_json_response_format(error_field=str(repr(ex)), error_description=message)
serialized_response = json.dumps(json_response, default=str)
if create_log:
logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message)
return serialized_response, status_code
return jsonify(json_response), status_code
def create_dynamic_exception_response(ex, status_code:int=400, message:typing.Optional[str]=None, create_log:bool=True):
message = repr(ex) if message is None else message
json_response = create_default_json_response_format(error_field="Exception", error_description=message)
json_response["error_field"] = "call failed"
serialized_response = json.dumps(json_response, default=str)
if create_log:
logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message)
return (serialized_response, status_code)
return (jsonify(json_response), status_code)

View File

@ -7,7 +7,7 @@ def base_url() -> str:
# Example: https://dev.api.mycompany.com
url = os.environ.get("API_BASE_URL")
if not url:
url = "http://neptun.fritz.box"
url = "http://127.0.0.1:5000"
# raise RuntimeError("Set API_BASE_URL")
return url.rstrip("/")

View File

@ -1,6 +1,15 @@
import os
import pytest
import schemathesis
schema = schemathesis.openapi.from_path("../../../misc/BreCalApi.yaml")
schema = schemathesis.openapi.from_path(
"../../../misc/BreCalApi.yaml",
)
schema.base_url = (os.environ.get("API_BASE_URL") or "http://127.0.0.1:5000").rstrip("/")
@pytest.fixture(scope="session", autouse=True)
def _set_schema_base_url(base_url: str) -> None:
schema.base_url = base_url
@schema.parametrize()
def test_api_conformance(
@ -9,9 +18,13 @@ def test_api_conformance(
auth_headers: dict[str, str],
login_payload: dict[str, str],
) -> None:
case.operation.schema.base_url = base_url
transport = getattr(case.operation.schema, "transport", None)
if transport is not None and hasattr(transport, "base_url"):
transport.base_url = base_url
# Calls your real service:
if case.path == "/login" and case.method.upper() == "POST":
response = case.call(base_url=base_url, json=login_payload)
response = case.call(base_url=base_url)
else:
response = case.call(base_url=base_url, headers=auth_headers)
# Validates status code, headers, and body against the OpenAPI schema: