Fixed boolean returns, first stage of schemathesis tests fixed, still 17 open

This commit is contained in:
Daniel Schick 2026-01-09 13:11:12 +01:00
parent f84b3fd7d1
commit 0ecc6aaefe
19 changed files with 106 additions and 55 deletions

2
.gitignore vendored
View File

@ -289,3 +289,5 @@ cython_debug/
# option (not recommended) you can uncomment the following to ignore the entire idea folder. # option (not recommended) you can uncomment the following to ignore the entire idea folder.
src/notebooks_metz/ src/notebooks_metz/
src/server/editable_requirements.txt src/server/editable_requirements.txt
schemathesis_report.html

View File

@ -95,6 +95,14 @@ paths:
$ref: '#/components/responses/400' $ref: '#/components/responses/400'
'401': '401':
$ref: '#/components/responses/401' $ref: '#/components/responses/401'
'404':
description: Not found
content:
application/json:
schema:
$ref: '#/components/schemas/Error'
example:
error_field: no such record
'500': '500':
$ref: '#/components/responses/500' $ref: '#/components/responses/500'
'503': '503':
@ -800,10 +808,14 @@ components:
properties: properties:
username: username:
type: string type: string
minLength: 1
pattern: "\\S"
example: alfred example: alfred
password: password:
type: string type: string
format: password format: password
minLength: 1
pattern: "\\S"
example: '123456' example: '123456'
required: required:
- username - username
@ -1701,19 +1713,25 @@ components:
example: 5 example: 5
first_name: first_name:
type: string type: string
maxLength: 45
example: John example: John
last_name: last_name:
type: string type: string
maxLength: 45
example: Doe example: Doe
user_name: user_name:
type: string type: string
maxLength: 45
example: johndoe example: johndoe
user_phone: user_phone:
type: string type: string
nullable: true nullable: true
maxLength: 32
example: '1234567890' example: '1234567890'
user_email: user_email:
type: string type: string
format: email
maxLength: 64
nullable: true nullable: true
example: no@where.com example: no@where.com
notify_email: notify_email:

View File

@ -19,3 +19,9 @@ Das Ganze funktioniert nur, wenn auch schemathesis und hypothesis in den passend
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0: Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
```pip install "hypothesis==6.120.0"``` ```pip install "hypothesis==6.120.0"```
Das muss wegen dependencies so blöd gepinnt werden. Das muss wegen dependencies so blöd gepinnt werden.
Damit pytest die API findet muss API_BASE_URL als Umgebungsvariable gesetzt werden. In Powershell z.B. so:
```powershell
$env:API_BASE_URL = "http://localhost:5000"
```

View File

@ -1,4 +1,4 @@
from flask import Blueprint, request from flask import Blueprint, request, jsonify
from flask_jwt_extended import create_access_token from flask_jwt_extended import create_access_token
from webargs.flaskparser import parser from webargs.flaskparser import parser
from ..schemas import model from ..schemas import model
@ -13,4 +13,14 @@ bp = Blueprint('login', __name__)
def Logon(): def Logon():
options = request.get_json(force=True) options = request.get_json(force=True)
if not isinstance(options, dict):
return jsonify({"error_field": "invalid request body"}), 400
username = options.get("username")
password = options.get("password")
if not username or not password:
return jsonify({"error_field": "username and password required"}), 400
if not isinstance(username, str) or not isinstance(password, str):
return jsonify({"error_field": "username and password must be strings"}), 400
if not username.strip() or not password.strip():
return jsonify({"error_field": "username and password cannot be empty"}), 400
return impl.login.GetUser(options) return impl.login.GetUser(options)

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
import pydapper import pydapper
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -33,7 +34,7 @@ def GetBerths(options):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500 return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import pydapper import pydapper
import pdb import pdb
from flask import jsonify
from ..schemas import model from ..schemas import model
from ..schemas.model import History from ..schemas.model import History
@ -34,7 +35,7 @@ def GetHistory(options):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps("call failed"), 500 return jsonify("call failed"), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
pooledConnection.close() pooledConnection.close()

View File

@ -1,7 +1,7 @@
import json
import logging import logging
import pydapper import pydapper
import bcrypt import bcrypt
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -43,16 +43,16 @@ def GetUser(options):
} }
token = jwt_handler.generate_jwt(payload=result, lifetime=120) # generate token valid 60 mins token = jwt_handler.generate_jwt(payload=result, lifetime=120) # generate token valid 60 mins
result["token"] = token # add token to user data result["token"] = token # add token to user data
return json.dumps(result), 200, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 200
if len(data) > 1: if len(data) > 1:
result = {} result = {}
result["error_field"] = "credential lookup mismatch" result["error_field"] = "credential lookup mismatch"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
result = {} result = {}
result["error_field"] = "invalid credentials" result["error_field"] = "invalid credentials"
return json.dumps(result), 403, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 403
except Exception as ex: except Exception as ex:
logging.error(ex) logging.error(ex)
@ -60,7 +60,7 @@ def GetUser(options):
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
result["error_description"] = str(ex) result["error_description"] = str(ex)
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
import pydapper import pydapper
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -28,7 +29,7 @@ def GetNotifications(token, participant_id=None):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
pooledConnection.close() pooledConnection.close()

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
import pydapper import pydapper
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -59,7 +60,7 @@ def GetParticipant(options):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps("call failed"), 500 return jsonify("call failed"), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
import pydapper import pydapper
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -23,7 +24,7 @@ def GetPorts(token):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500 return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import traceback import traceback
import pydapper import pydapper
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -44,7 +45,7 @@ def GetShipcalls(options):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
@ -178,7 +179,7 @@ def PostShipcalls(schemaModel):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
@ -205,7 +206,7 @@ def PutShipcalls(schemaModel, original_payload=None):
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]}) theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel: if theshipcall is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} return jsonify("no such record"), 404
was_canceled = theshipcall["canceled"] was_canceled = theshipcall["canceled"]
@ -325,7 +326,7 @@ def PutShipcalls(schemaModel, original_payload=None):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -1,6 +1,7 @@
import json import json
import logging import logging
import pydapper import pydapper
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -26,7 +27,7 @@ def GetShips(token):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
@ -90,7 +91,7 @@ def PostShip(schemaModel):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
pooledConnection.close() pooledConnection.close()
@ -133,7 +134,7 @@ def PutShip(schemaModel):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
pooledConnection.close() pooledConnection.close()
@ -158,14 +159,14 @@ def DeleteShip(options):
result = {} result = {}
result["error_field"] = "no such record" result["error_field"] = "no such record"
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 404
except Exception as ex: except Exception as ex:
logging.error(ex) logging.error(ex)
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
pooledConnection.close() pooledConnection.close()

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import traceback import traceback
import pydapper import pydapper
from flask import jsonify
from enum import Enum, Flag from enum import Enum, Flag
from ..schemas import model from ..schemas import model
@ -35,7 +36,7 @@ def GetTimes(options):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
@ -108,7 +109,7 @@ def PostTimes(schemaModel):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
@ -130,7 +131,7 @@ def PutTimes(schemaModel, original_payload=None):
sentinel = object() sentinel = object()
existing_times = commands.query_single_or_default("SELECT * FROM times WHERE id = ?id?", sentinel, param={"id": schemaModel["id"]}) existing_times = commands.query_single_or_default("SELECT * FROM times WHERE id = ?id?", sentinel, param={"id": schemaModel["id"]})
if existing_times is sentinel: if existing_times is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} return jsonify("no such record"), 404
provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None
@ -177,7 +178,7 @@ def PutTimes(schemaModel, original_payload=None):
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:
@ -208,14 +209,14 @@ def DeleteTimes(options):
result = {} result = {}
result["error_field"] = "no such record" result["error_field"] = "no such record"
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 404
except Exception as ex: except Exception as ex:
logging.error(ex) logging.error(ex)
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -2,6 +2,7 @@ import json
import logging import logging
import pydapper import pydapper
import bcrypt import bcrypt
from flask import jsonify
from ..schemas import model from ..schemas import model
from .. import local_db from .. import local_db
@ -27,7 +28,7 @@ def PutUser(schemaModel):
theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User) theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel: if theuser is sentinel:
# #TODO: result = {"message":"no such record"} -> json.dumps # #TODO: result = {"message":"no such record"} -> json.dumps
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} return jsonify({"error_field": "no such record"}), 404
# see if we need to update public fields # see if we need to update public fields
# #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields # #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields
@ -70,16 +71,16 @@ def PutUser(schemaModel):
else: else:
result = {} result = {}
result["error_field"] = "old password invalid" result["error_field"] = "old password invalid"
return json.dumps(result), 400, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 400
return json.dumps({"id" : schemaModel["id"]}), 200 return jsonify({"id" : schemaModel["id"]}), 200
except Exception as ex: except Exception as ex:
logging.error(ex) logging.error(ex)
print(ex) print(ex)
result = {} result = {}
result["error_field"] = "call failed" result["error_field"] = "call failed"
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'} return jsonify(result), 500
finally: finally:
if pooledConnection is not None: if pooledConnection is not None:

View File

@ -594,10 +594,10 @@ class UserSchema(Schema):
super().__init__(unknown=None) super().__init__(unknown=None)
pass pass
id = fields.Integer(required=True) id = fields.Integer(required=True)
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)]) first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)]) last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
user_phone = fields.String(allow_none=True, required=False) user_phone = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)]) user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
old_password = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)]) old_password = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
new_password = fields.String(allow_none=True, required=False, validate=[validate.Length(min=6, max=128)]) new_password = fields.String(allow_none=True, required=False, validate=[validate.Length(min=6, max=128)])
notify_email = fields.Bool(allow_none=True, required=False) notify_email = fields.Bool(allow_none=True, required=False)

View File

@ -1,5 +1,4 @@
import json from flask import request, jsonify
from flask import request
from .jwt_handler import decode_jwt from .jwt_handler import decode_jwt
def check_jwt(): def check_jwt():
@ -25,9 +24,9 @@ def auth_guard(role=None):
try: try:
user_data = check_jwt() user_data = check_jwt()
except Exception as e: except Exception as e:
return json.dumps({"error_field" : f'{e}', "status": 401}), 401 return jsonify({"error_field" : f'{e}', "status": 401}), 401
if role and role not in user_data['roles']: if role and role not in user_data['roles']:
return json.dumps({"error_field": 'Authorization required.', "status" : 403}), 403 return jsonify({"error_field": 'Authorization required.', "status" : 403}), 403
# get on to original route # get on to original route
return route_function(*args, **kwargs) return route_function(*args, **kwargs)
decorated_function.__name__ = route_function.__name__ decorated_function.__name__ = route_function.__name__

View File

@ -2,6 +2,7 @@ import logging
import typing import typing
import json import json
import sys import sys
from flask import jsonify
from marshmallow import ValidationError from marshmallow import ValidationError
from werkzeug.exceptions import Forbidden from werkzeug.exceptions import Forbidden
@ -29,7 +30,7 @@ def unbundle_(errors, unbundled=[]):
[{'error_field':'keya', 'error_description':['keyb']}, {'error_field':'keyc12', 'error_description':[12]}] [{'error_field':'keya', 'error_description':['keyb']}, {'error_field':'keyc12', 'error_description':[12]}]
As can be seen, only the subkeys and their respective value are received. Each value is *always* a list. As can be seen, only the subkeys and their respective value are received. Each value is *always* a list.
""" """
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description":v[0] if isinstance(v,list) else str(v)}) for k,v in errors.items()} {k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description": str(v[0] if isinstance(v,list) else v)}) for k,v in errors.items()}
return return
def unbundle_validation_error_message(message): def unbundle_validation_error_message(message):
@ -40,8 +41,8 @@ def unbundle_validation_error_message(message):
unbundled = [] unbundled = []
unbundle_(message, unbundled=unbundled) unbundle_(message, unbundled=unbundled)
if len(unbundled)>0: if len(unbundled)>0:
error_field = "ValidationError in the following field(s): " + " & ".join([unb["error_field"] for unb in unbundled]) error_field = "ValidationError in the following field(s): " + " & ".join([str(unb["error_field"]) for unb in unbundled])
error_description = " " . join([unb["error_description"] for unb in unbundled]) error_description = " " . join([str(unb["error_description"]) for unb in unbundled])
else: else:
error_field = "ValidationError" error_field = "ValidationError"
error_description = "unknown validation error" error_description = "unknown validation error"
@ -53,35 +54,28 @@ def create_validation_error_response(ex:ValidationError, status_code:int=400, cr
(error_field, error_description) = unbundle_validation_error_message(message) (error_field, error_description) = unbundle_validation_error_message(message)
json_response = create_default_json_response_format(error_field=error_field, error_description=error_description) json_response = create_default_json_response_format(error_field=error_field, error_description=error_description)
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
# natively serializable are properly serialized.
serialized_response = json.dumps(json_response, default=str)
if create_log: if create_log:
logging.warning(ex) if ex is not None else logging.warning(message) logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message) # print(ex) if ex is not None else print(message)
return (serialized_response, status_code) return (jsonify(json_response), status_code)
def create_werkzeug_error_response(ex:Forbidden, status_code:int=403, create_log:bool=True)->typing.Tuple[str,int]: def create_werkzeug_error_response(ex:Forbidden, status_code:int=403, create_log:bool=True)->typing.Tuple[str,int]:
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not) # json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
# natively serializable are properly serialized. # natively serializable are properly serialized.
message = ex.description message = ex.description
json_response = create_default_json_response_format(error_field=str(repr(ex)), error_description=message) json_response = create_default_json_response_format(error_field=str(repr(ex)), error_description=message)
serialized_response = json.dumps(json_response, default=str)
if create_log: if create_log:
logging.warning(ex) if ex is not None else logging.warning(message) logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message) # print(ex) if ex is not None else print(message)
return serialized_response, status_code return jsonify(json_response), status_code
def create_dynamic_exception_response(ex, status_code:int=400, message:typing.Optional[str]=None, create_log:bool=True): def create_dynamic_exception_response(ex, status_code:int=400, message:typing.Optional[str]=None, create_log:bool=True):
message = repr(ex) if message is None else message message = repr(ex) if message is None else message
json_response = create_default_json_response_format(error_field="Exception", error_description=message) json_response = create_default_json_response_format(error_field="Exception", error_description=message)
json_response["error_field"] = "call failed" json_response["error_field"] = "call failed"
serialized_response = json.dumps(json_response, default=str)
if create_log: if create_log:
logging.warning(ex) if ex is not None else logging.warning(message) logging.warning(ex) if ex is not None else logging.warning(message)
# print(ex) if ex is not None else print(message) # print(ex) if ex is not None else print(message)
return (serialized_response, status_code) return (jsonify(json_response), status_code)

View File

@ -7,7 +7,7 @@ def base_url() -> str:
# Example: https://dev.api.mycompany.com # Example: https://dev.api.mycompany.com
url = os.environ.get("API_BASE_URL") url = os.environ.get("API_BASE_URL")
if not url: if not url:
url = "http://neptun.fritz.box" url = "http://127.0.0.1:5000"
# raise RuntimeError("Set API_BASE_URL") # raise RuntimeError("Set API_BASE_URL")
return url.rstrip("/") return url.rstrip("/")

View File

@ -1,6 +1,15 @@
import os
import pytest
import schemathesis import schemathesis
schema = schemathesis.openapi.from_path("../../../misc/BreCalApi.yaml") schema = schemathesis.openapi.from_path(
"../../../misc/BreCalApi.yaml",
)
schema.base_url = (os.environ.get("API_BASE_URL") or "http://127.0.0.1:5000").rstrip("/")
@pytest.fixture(scope="session", autouse=True)
def _set_schema_base_url(base_url: str) -> None:
schema.base_url = base_url
@schema.parametrize() @schema.parametrize()
def test_api_conformance( def test_api_conformance(
@ -9,9 +18,13 @@ def test_api_conformance(
auth_headers: dict[str, str], auth_headers: dict[str, str],
login_payload: dict[str, str], login_payload: dict[str, str],
) -> None: ) -> None:
case.operation.schema.base_url = base_url
transport = getattr(case.operation.schema, "transport", None)
if transport is not None and hasattr(transport, "base_url"):
transport.base_url = base_url
# Calls your real service: # Calls your real service:
if case.path == "/login" and case.method.upper() == "POST": if case.path == "/login" and case.method.upper() == "POST":
response = case.call(base_url=base_url, json=login_payload) response = case.call(base_url=base_url)
else: else:
response = case.call(base_url=base_url, headers=auth_headers) response = case.call(base_url=base_url, headers=auth_headers)
# Validates status code, headers, and body against the OpenAPI schema: # Validates status code, headers, and body against the OpenAPI schema: