Compare commits
No commits in common. "0ecc6aaefec8ab9e42318a8505a32617b7b86849" and "861b59286427f597ac2d36559148916072805cde" have entirely different histories.
0ecc6aaefe
...
861b592864
2
.gitignore
vendored
2
.gitignore
vendored
@ -289,5 +289,3 @@ cython_debug/
|
|||||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||||
src/notebooks_metz/
|
src/notebooks_metz/
|
||||||
src/server/editable_requirements.txt
|
src/server/editable_requirements.txt
|
||||||
|
|
||||||
schemathesis_report.html
|
|
||||||
@ -95,14 +95,6 @@ paths:
|
|||||||
$ref: '#/components/responses/400'
|
$ref: '#/components/responses/400'
|
||||||
'401':
|
'401':
|
||||||
$ref: '#/components/responses/401'
|
$ref: '#/components/responses/401'
|
||||||
'404':
|
|
||||||
description: Not found
|
|
||||||
content:
|
|
||||||
application/json:
|
|
||||||
schema:
|
|
||||||
$ref: '#/components/schemas/Error'
|
|
||||||
example:
|
|
||||||
error_field: no such record
|
|
||||||
'500':
|
'500':
|
||||||
$ref: '#/components/responses/500'
|
$ref: '#/components/responses/500'
|
||||||
'503':
|
'503':
|
||||||
@ -808,14 +800,10 @@ components:
|
|||||||
properties:
|
properties:
|
||||||
username:
|
username:
|
||||||
type: string
|
type: string
|
||||||
minLength: 1
|
|
||||||
pattern: "\\S"
|
|
||||||
example: alfred
|
example: alfred
|
||||||
password:
|
password:
|
||||||
type: string
|
type: string
|
||||||
format: password
|
format: password
|
||||||
minLength: 1
|
|
||||||
pattern: "\\S"
|
|
||||||
example: '123456'
|
example: '123456'
|
||||||
required:
|
required:
|
||||||
- username
|
- username
|
||||||
@ -1713,25 +1701,19 @@ components:
|
|||||||
example: 5
|
example: 5
|
||||||
first_name:
|
first_name:
|
||||||
type: string
|
type: string
|
||||||
maxLength: 45
|
|
||||||
example: John
|
example: John
|
||||||
last_name:
|
last_name:
|
||||||
type: string
|
type: string
|
||||||
maxLength: 45
|
|
||||||
example: Doe
|
example: Doe
|
||||||
user_name:
|
user_name:
|
||||||
type: string
|
type: string
|
||||||
maxLength: 45
|
|
||||||
example: johndoe
|
example: johndoe
|
||||||
user_phone:
|
user_phone:
|
||||||
type: string
|
type: string
|
||||||
nullable: true
|
nullable: true
|
||||||
maxLength: 32
|
|
||||||
example: '1234567890'
|
example: '1234567890'
|
||||||
user_email:
|
user_email:
|
||||||
type: string
|
type: string
|
||||||
format: email
|
|
||||||
maxLength: 64
|
|
||||||
nullable: true
|
nullable: true
|
||||||
example: no@where.com
|
example: no@where.com
|
||||||
notify_email:
|
notify_email:
|
||||||
|
|||||||
@ -19,9 +19,3 @@ Das Ganze funktioniert nur, wenn auch schemathesis und hypothesis in den passend
|
|||||||
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
|
Aktuell habe ich schemathesis ("latest") und hypothesis 6.120.0:
|
||||||
```pip install "hypothesis==6.120.0"```
|
```pip install "hypothesis==6.120.0"```
|
||||||
Das muss wegen dependencies so blöd gepinnt werden.
|
Das muss wegen dependencies so blöd gepinnt werden.
|
||||||
|
|
||||||
Damit pytest die API findet muss API_BASE_URL als Umgebungsvariable gesetzt werden. In Powershell z.B. so:
|
|
||||||
|
|
||||||
```powershell
|
|
||||||
$env:API_BASE_URL = "http://localhost:5000"
|
|
||||||
```
|
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
from flask import Blueprint, request, jsonify
|
from flask import Blueprint, request
|
||||||
from flask_jwt_extended import create_access_token
|
from flask_jwt_extended import create_access_token
|
||||||
from webargs.flaskparser import parser
|
from webargs.flaskparser import parser
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
@ -13,14 +13,4 @@ bp = Blueprint('login', __name__)
|
|||||||
def Logon():
|
def Logon():
|
||||||
|
|
||||||
options = request.get_json(force=True)
|
options = request.get_json(force=True)
|
||||||
if not isinstance(options, dict):
|
|
||||||
return jsonify({"error_field": "invalid request body"}), 400
|
|
||||||
username = options.get("username")
|
|
||||||
password = options.get("password")
|
|
||||||
if not username or not password:
|
|
||||||
return jsonify({"error_field": "username and password required"}), 400
|
|
||||||
if not isinstance(username, str) or not isinstance(password, str):
|
|
||||||
return jsonify({"error_field": "username and password must be strings"}), 400
|
|
||||||
if not username.strip() or not password.strip():
|
|
||||||
return jsonify({"error_field": "username and password cannot be empty"}), 400
|
|
||||||
return impl.login.GetUser(options)
|
return impl.login.GetUser(options)
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -34,7 +33,7 @@ def GetBerths(options):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -2,7 +2,6 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
import pdb
|
import pdb
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from ..schemas.model import History
|
from ..schemas.model import History
|
||||||
@ -35,7 +34,7 @@ def GetHistory(options):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify("call failed"), 500
|
return json.dumps("call failed"), 500
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
pooledConnection.close()
|
pooledConnection.close()
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
import bcrypt
|
import bcrypt
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -35,24 +35,24 @@ def GetUser(options):
|
|||||||
"user_name": data[0].user_name,
|
"user_name": data[0].user_name,
|
||||||
"user_phone": data[0].user_phone,
|
"user_phone": data[0].user_phone,
|
||||||
"user_email": data[0].user_email,
|
"user_email": data[0].user_email,
|
||||||
"notify_email": model._coerce_bool(data[0].notify_email),
|
"notify_email": data[0].notify_email,
|
||||||
"notify_whatsapp": model._coerce_bool(data[0].notify_whatsapp),
|
"notify_whatsapp": data[0].notify_whatsapp,
|
||||||
"notify_signal": model._coerce_bool(data[0].notify_signal),
|
"notify_signal": data[0].notify_signal,
|
||||||
"notify_popup": model._coerce_bool(data[0].notify_popup),
|
"notify_popup": data[0].notify_popup,
|
||||||
"notify_on": model.notification_types_to_names(model.bitflag_to_list(data[0].notify_event))
|
"notify_on": model.notification_types_to_names(model.bitflag_to_list(data[0].notify_event))
|
||||||
}
|
}
|
||||||
token = jwt_handler.generate_jwt(payload=result, lifetime=120) # generate token valid 60 mins
|
token = jwt_handler.generate_jwt(payload=result, lifetime=120) # generate token valid 60 mins
|
||||||
result["token"] = token # add token to user data
|
result["token"] = token # add token to user data
|
||||||
return jsonify(result), 200
|
return json.dumps(result), 200, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
if len(data) > 1:
|
if len(data) > 1:
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "credential lookup mismatch"
|
result["error_field"] = "credential lookup mismatch"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "invalid credentials"
|
result["error_field"] = "invalid credentials"
|
||||||
return jsonify(result), 403
|
return json.dumps(result), 403, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logging.error(ex)
|
logging.error(ex)
|
||||||
@ -60,7 +60,7 @@ def GetUser(options):
|
|||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
result["error_description"] = str(ex)
|
result["error_description"] = str(ex)
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -29,7 +28,7 @@ def GetNotifications(token, participant_id=None):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
pooledConnection.close()
|
pooledConnection.close()
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -60,7 +59,7 @@ def GetParticipant(options):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify("call failed"), 500
|
return json.dumps("call failed"), 500
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -24,7 +23,7 @@ def GetPorts(token):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -2,7 +2,6 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -45,7 +44,7 @@ def GetShipcalls(options):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
@ -179,7 +178,7 @@ def PostShipcalls(schemaModel):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
@ -206,7 +205,7 @@ def PutShipcalls(schemaModel, original_payload=None):
|
|||||||
|
|
||||||
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
|
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
|
||||||
if theshipcall is sentinel:
|
if theshipcall is sentinel:
|
||||||
return jsonify("no such record"), 404
|
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
was_canceled = theshipcall["canceled"]
|
was_canceled = theshipcall["canceled"]
|
||||||
|
|
||||||
@ -326,7 +325,7 @@ def PutShipcalls(schemaModel, original_payload=None):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -27,7 +26,7 @@ def GetShips(token):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
@ -91,7 +90,7 @@ def PostShip(schemaModel):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
pooledConnection.close()
|
pooledConnection.close()
|
||||||
@ -134,7 +133,7 @@ def PutShip(schemaModel):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
pooledConnection.close()
|
pooledConnection.close()
|
||||||
@ -159,14 +158,14 @@ def DeleteShip(options):
|
|||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "no such record"
|
result["error_field"] = "no such record"
|
||||||
return jsonify(result), 404
|
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logging.error(ex)
|
logging.error(ex)
|
||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
pooledConnection.close()
|
pooledConnection.close()
|
||||||
|
|||||||
@ -2,7 +2,6 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import traceback
|
import traceback
|
||||||
import pydapper
|
import pydapper
|
||||||
from flask import jsonify
|
|
||||||
from enum import Enum, Flag
|
from enum import Enum, Flag
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
@ -36,7 +35,7 @@ def GetTimes(options):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
@ -109,7 +108,7 @@ def PostTimes(schemaModel):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
@ -131,7 +130,7 @@ def PutTimes(schemaModel, original_payload=None):
|
|||||||
sentinel = object()
|
sentinel = object()
|
||||||
existing_times = commands.query_single_or_default("SELECT * FROM times WHERE id = ?id?", sentinel, param={"id": schemaModel["id"]})
|
existing_times = commands.query_single_or_default("SELECT * FROM times WHERE id = ?id?", sentinel, param={"id": schemaModel["id"]})
|
||||||
if existing_times is sentinel:
|
if existing_times is sentinel:
|
||||||
return jsonify("no such record"), 404
|
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None
|
provided_keys = set(original_payload.keys()) if isinstance(original_payload, dict) else None
|
||||||
|
|
||||||
@ -178,7 +177,7 @@ def PutTimes(schemaModel, original_payload=None):
|
|||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
@ -209,14 +208,14 @@ def DeleteTimes(options):
|
|||||||
|
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "no such record"
|
result["error_field"] = "no such record"
|
||||||
return jsonify(result), 404
|
return json.dumps(result), 404, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logging.error(ex)
|
logging.error(ex)
|
||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -2,7 +2,6 @@ import json
|
|||||||
import logging
|
import logging
|
||||||
import pydapper
|
import pydapper
|
||||||
import bcrypt
|
import bcrypt
|
||||||
from flask import jsonify
|
|
||||||
|
|
||||||
from ..schemas import model
|
from ..schemas import model
|
||||||
from .. import local_db
|
from .. import local_db
|
||||||
@ -28,7 +27,7 @@ def PutUser(schemaModel):
|
|||||||
theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User)
|
theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User)
|
||||||
if theuser is sentinel:
|
if theuser is sentinel:
|
||||||
# #TODO: result = {"message":"no such record"} -> json.dumps
|
# #TODO: result = {"message":"no such record"} -> json.dumps
|
||||||
return jsonify({"error_field": "no such record"}), 404
|
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
# see if we need to update public fields
|
# see if we need to update public fields
|
||||||
# #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields
|
# #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields
|
||||||
@ -71,16 +70,16 @@ def PutUser(schemaModel):
|
|||||||
else:
|
else:
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "old password invalid"
|
result["error_field"] = "old password invalid"
|
||||||
return jsonify(result), 400
|
return json.dumps(result), 400, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
return jsonify({"id" : schemaModel["id"]}), 200
|
return json.dumps({"id" : schemaModel["id"]}), 200
|
||||||
|
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
logging.error(ex)
|
logging.error(ex)
|
||||||
print(ex)
|
print(ex)
|
||||||
result = {}
|
result = {}
|
||||||
result["error_field"] = "call failed"
|
result["error_field"] = "call failed"
|
||||||
return jsonify(result), 500
|
return json.dumps(result), 500, {'Content-Type': 'application/json; charset=utf-8'}
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
if pooledConnection is not None:
|
if pooledConnection is not None:
|
||||||
|
|||||||
@ -24,15 +24,6 @@ def obj_dict(obj):
|
|||||||
return obj.to_json()
|
return obj.to_json()
|
||||||
return obj.__dict__
|
return obj.__dict__
|
||||||
|
|
||||||
def _coerce_bool(value):
|
|
||||||
if value is None:
|
|
||||||
return None
|
|
||||||
if isinstance(value, bool):
|
|
||||||
return value
|
|
||||||
if isinstance(value, int) and value in (0, 1):
|
|
||||||
return bool(value)
|
|
||||||
return value
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Berth(Schema):
|
class Berth(Schema):
|
||||||
id: int
|
id: int
|
||||||
@ -45,19 +36,6 @@ class Berth(Schema):
|
|||||||
modified: datetime
|
modified: datetime
|
||||||
deleted: bool
|
deleted: bool
|
||||||
|
|
||||||
def to_json(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"name": self.name,
|
|
||||||
"lock": _coerce_bool(self.lock),
|
|
||||||
"owner_id": self.owner_id,
|
|
||||||
"authority_id": self.authority_id,
|
|
||||||
"port_id": self.port_id,
|
|
||||||
"created": self.created.isoformat() if self.created else "",
|
|
||||||
"modified": self.modified.isoformat() if self.modified else "",
|
|
||||||
"deleted": _coerce_bool(self.deleted),
|
|
||||||
}
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class Port(Schema):
|
class Port(Schema):
|
||||||
id: int
|
id: int
|
||||||
@ -67,16 +45,6 @@ class Port(Schema):
|
|||||||
modified: datetime
|
modified: datetime
|
||||||
deleted: bool
|
deleted: bool
|
||||||
|
|
||||||
def to_json(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"name": self.name,
|
|
||||||
"locode": self.locode,
|
|
||||||
"created": self.created.isoformat() if self.created else "",
|
|
||||||
"modified": self.modified.isoformat() if self.modified else "",
|
|
||||||
"deleted": _coerce_bool(self.deleted),
|
|
||||||
}
|
|
||||||
|
|
||||||
class OperationType(IntEnum):
|
class OperationType(IntEnum):
|
||||||
undefined = 0
|
undefined = 0
|
||||||
insert = 1
|
insert = 1
|
||||||
@ -244,21 +212,6 @@ class Participant(Schema):
|
|||||||
deleted: bool
|
deleted: bool
|
||||||
ports: List[int] = field(default_factory=list)
|
ports: List[int] = field(default_factory=list)
|
||||||
|
|
||||||
def to_json(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"name": self.name,
|
|
||||||
"street": self.street,
|
|
||||||
"postal_code": self.postal_code,
|
|
||||||
"city": self.city,
|
|
||||||
"type": self.type,
|
|
||||||
"flags": self.flags,
|
|
||||||
"created": self.created.isoformat() if self.created else "",
|
|
||||||
"modified": self.modified.isoformat() if self.modified else "",
|
|
||||||
"deleted": _coerce_bool(self.deleted),
|
|
||||||
"ports": self.ports,
|
|
||||||
}
|
|
||||||
|
|
||||||
@validates("type")
|
@validates("type")
|
||||||
def validate_type(self, value, **kwargs):
|
def validate_type(self, value, **kwargs):
|
||||||
# e.g., when an IntFlag has the values 1,2,4; the maximum valid value is 7
|
# e.g., when an IntFlag has the values 1,2,4; the maximum valid value is 7
|
||||||
@ -417,25 +370,25 @@ class Shipcall:
|
|||||||
"etd": self.etd.isoformat() if self.etd else "",
|
"etd": self.etd.isoformat() if self.etd else "",
|
||||||
"arrival_berth_id": self.arrival_berth_id,
|
"arrival_berth_id": self.arrival_berth_id,
|
||||||
"departure_berth_id": self.departure_berth_id,
|
"departure_berth_id": self.departure_berth_id,
|
||||||
"tug_required": _coerce_bool(self.tug_required),
|
"tug_required": self.tug_required,
|
||||||
"pilot_required": _coerce_bool(self.pilot_required),
|
"pilot_required": self.pilot_required,
|
||||||
"flags": self.flags,
|
"flags": self.flags,
|
||||||
"pier_side": _coerce_bool(self.pier_side),
|
"pier_side": self.pier_side,
|
||||||
"bunkering": _coerce_bool(self.bunkering),
|
"bunkering": self.bunkering,
|
||||||
"replenishing_terminal": _coerce_bool(self.replenishing_terminal),
|
"replenishing_terminal": self.replenishing_terminal,
|
||||||
"replenishing_lock": _coerce_bool(self.replenishing_lock),
|
"replenishing_lock": self.replenishing_lock,
|
||||||
"draft": self.draft,
|
"draft": self.draft,
|
||||||
"tidal_window_from": self.tidal_window_from.isoformat() if self.tidal_window_from else "",
|
"tidal_window_from": self.tidal_window_from.isoformat() if self.tidal_window_from else "",
|
||||||
"tidal_window_to": self.tidal_window_to.isoformat() if self.tidal_window_to else "",
|
"tidal_window_to": self.tidal_window_to.isoformat() if self.tidal_window_to else "",
|
||||||
"rain_sensitive_cargo": _coerce_bool(self.rain_sensitive_cargo),
|
"rain_sensitive_cargo": self.rain_sensitive_cargo,
|
||||||
"recommended_tugs": self.recommended_tugs,
|
"recommended_tugs": self.recommended_tugs,
|
||||||
"anchored": _coerce_bool(self.anchored),
|
"anchored": self.anchored,
|
||||||
"moored_lock": _coerce_bool(self.moored_lock),
|
"moored_lock": self.moored_lock,
|
||||||
"canceled": _coerce_bool(self.canceled),
|
"canceled": self.canceled,
|
||||||
"evaluation": self.evaluation.name if isinstance(self.evaluation, IntEnum) else EvaluationType(self.evaluation).name,
|
"evaluation": self.evaluation.name if isinstance(self.evaluation, IntEnum) else EvaluationType(self.evaluation).name,
|
||||||
"evaluation_message": self.evaluation_message,
|
"evaluation_message": self.evaluation_message,
|
||||||
"evaluation_time": self.evaluation_time.isoformat() if self.evaluation_time else "",
|
"evaluation_time": self.evaluation_time.isoformat() if self.evaluation_time else "",
|
||||||
"evaluation_notifications_sent": _coerce_bool(self.evaluation_notifications_sent),
|
"evaluation_notifications_sent": self.evaluation_notifications_sent,
|
||||||
"time_ref_point": self.time_ref_point,
|
"time_ref_point": self.time_ref_point,
|
||||||
"port_id": self.port_id,
|
"port_id": self.port_id,
|
||||||
"created": self.created.isoformat() if self.created else "",
|
"created": self.created.isoformat() if self.created else "",
|
||||||
@ -447,39 +400,7 @@ class Shipcall:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def from_query_row(self, id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified):
|
def from_query_row(self, id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified):
|
||||||
return self(
|
return self(id, ship_id, ShipcallType(type), eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, EvaluationType(evaluation), evaluation_message, evaluation_time, evaluation_notifications_sent, time_ref_point, port_id, created, modified)
|
||||||
id,
|
|
||||||
ship_id,
|
|
||||||
ShipcallType(type),
|
|
||||||
eta,
|
|
||||||
voyage,
|
|
||||||
etd,
|
|
||||||
arrival_berth_id,
|
|
||||||
departure_berth_id,
|
|
||||||
_coerce_bool(tug_required),
|
|
||||||
_coerce_bool(pilot_required),
|
|
||||||
flags,
|
|
||||||
_coerce_bool(pier_side),
|
|
||||||
_coerce_bool(bunkering),
|
|
||||||
_coerce_bool(replenishing_terminal),
|
|
||||||
_coerce_bool(replenishing_lock),
|
|
||||||
draft,
|
|
||||||
tidal_window_from,
|
|
||||||
tidal_window_to,
|
|
||||||
_coerce_bool(rain_sensitive_cargo),
|
|
||||||
recommended_tugs,
|
|
||||||
_coerce_bool(anchored),
|
|
||||||
_coerce_bool(moored_lock),
|
|
||||||
_coerce_bool(canceled),
|
|
||||||
EvaluationType(evaluation),
|
|
||||||
evaluation_message,
|
|
||||||
evaluation_time,
|
|
||||||
_coerce_bool(evaluation_notifications_sent),
|
|
||||||
time_ref_point,
|
|
||||||
port_id,
|
|
||||||
created,
|
|
||||||
modified
|
|
||||||
)
|
|
||||||
|
|
||||||
class ShipcallId(Schema):
|
class ShipcallId(Schema):
|
||||||
pass
|
pass
|
||||||
@ -594,10 +515,10 @@ class UserSchema(Schema):
|
|||||||
super().__init__(unknown=None)
|
super().__init__(unknown=None)
|
||||||
pass
|
pass
|
||||||
id = fields.Integer(required=True)
|
id = fields.Integer(required=True)
|
||||||
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
|
first_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
|
||||||
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=45)])
|
last_name = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
|
||||||
user_phone = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
|
user_phone = fields.String(allow_none=True, required=False)
|
||||||
user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
|
user_email = fields.String(allow_none=True, required=False, validate=[validate.Length(max=64)])
|
||||||
old_password = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
|
old_password = fields.String(allow_none=True, required=False, validate=[validate.Length(max=128)])
|
||||||
new_password = fields.String(allow_none=True, required=False, validate=[validate.Length(min=6, max=128)])
|
new_password = fields.String(allow_none=True, required=False, validate=[validate.Length(min=6, max=128)])
|
||||||
notify_email = fields.Bool(allow_none=True, required=False)
|
notify_email = fields.Bool(allow_none=True, required=False)
|
||||||
@ -646,34 +567,6 @@ class Times:
|
|||||||
created: datetime
|
created: datetime
|
||||||
modified: datetime
|
modified: datetime
|
||||||
|
|
||||||
def to_json(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"eta_berth": self.eta_berth.isoformat() if self.eta_berth else "",
|
|
||||||
"eta_berth_fixed": _coerce_bool(self.eta_berth_fixed),
|
|
||||||
"etd_berth": self.etd_berth.isoformat() if self.etd_berth else "",
|
|
||||||
"etd_berth_fixed": _coerce_bool(self.etd_berth_fixed),
|
|
||||||
"lock_time": self.lock_time.isoformat() if self.lock_time else "",
|
|
||||||
"lock_time_fixed": _coerce_bool(self.lock_time_fixed),
|
|
||||||
"zone_entry": self.zone_entry.isoformat() if self.zone_entry else "",
|
|
||||||
"zone_entry_fixed": _coerce_bool(self.zone_entry_fixed),
|
|
||||||
"operations_start": self.operations_start.isoformat() if self.operations_start else "",
|
|
||||||
"operations_end": self.operations_end.isoformat() if self.operations_end else "",
|
|
||||||
"remarks": self.remarks,
|
|
||||||
"participant_id": self.participant_id,
|
|
||||||
"berth_id": self.berth_id,
|
|
||||||
"berth_info": self.berth_info,
|
|
||||||
"pier_side": _coerce_bool(self.pier_side),
|
|
||||||
"participant_type": self.participant_type,
|
|
||||||
"shipcall_id": self.shipcall_id,
|
|
||||||
"ata": self.ata.isoformat() if self.ata else "",
|
|
||||||
"atd": self.atd.isoformat() if self.atd else "",
|
|
||||||
"eta_interval_end": self.eta_interval_end.isoformat() if self.eta_interval_end else "",
|
|
||||||
"etd_interval_end": self.etd_interval_end.isoformat() if self.etd_interval_end else "",
|
|
||||||
"created": self.created.isoformat() if self.created else "",
|
|
||||||
"modified": self.modified.isoformat() if self.modified else "",
|
|
||||||
}
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class User:
|
class User:
|
||||||
|
|
||||||
@ -717,23 +610,6 @@ class Ship:
|
|||||||
modified: datetime
|
modified: datetime
|
||||||
deleted: bool
|
deleted: bool
|
||||||
|
|
||||||
def to_json(self):
|
|
||||||
return {
|
|
||||||
"id": self.id,
|
|
||||||
"name": self.name,
|
|
||||||
"imo": self.imo,
|
|
||||||
"callsign": self.callsign,
|
|
||||||
"participant_id": self.participant_id,
|
|
||||||
"length": self.length,
|
|
||||||
"width": self.width,
|
|
||||||
"is_tug": _coerce_bool(self.is_tug),
|
|
||||||
"bollard_pull": self.bollard_pull,
|
|
||||||
"eni": self.eni,
|
|
||||||
"created": self.created.isoformat() if self.created else "",
|
|
||||||
"modified": self.modified.isoformat() if self.modified else "",
|
|
||||||
"deleted": _coerce_bool(self.deleted),
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class ShipSchema(Schema):
|
class ShipSchema(Schema):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
from flask import request, jsonify
|
import json
|
||||||
|
from flask import request
|
||||||
from .jwt_handler import decode_jwt
|
from .jwt_handler import decode_jwt
|
||||||
|
|
||||||
def check_jwt():
|
def check_jwt():
|
||||||
@ -24,11 +25,11 @@ def auth_guard(role=None):
|
|||||||
try:
|
try:
|
||||||
user_data = check_jwt()
|
user_data = check_jwt()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
return jsonify({"error_field" : f'{e}', "status": 401}), 401
|
return json.dumps({"error_field" : f'{e}', "status": 401}), 401
|
||||||
if role and role not in user_data['roles']:
|
if role and role not in user_data['roles']:
|
||||||
return jsonify({"error_field": 'Authorization required.', "status" : 403}), 403
|
return json.dumps({"error_field": 'Authorization required.', "status" : 403}), 403
|
||||||
# get on to original route
|
# get on to original route
|
||||||
return route_function(*args, **kwargs)
|
return route_function(*args, **kwargs)
|
||||||
decorated_function.__name__ = route_function.__name__
|
decorated_function.__name__ = route_function.__name__
|
||||||
return decorated_function
|
return decorated_function
|
||||||
return wrapper
|
return wrapper
|
||||||
@ -2,7 +2,6 @@ import logging
|
|||||||
import typing
|
import typing
|
||||||
import json
|
import json
|
||||||
import sys
|
import sys
|
||||||
from flask import jsonify
|
|
||||||
from marshmallow import ValidationError
|
from marshmallow import ValidationError
|
||||||
from werkzeug.exceptions import Forbidden
|
from werkzeug.exceptions import Forbidden
|
||||||
|
|
||||||
@ -30,7 +29,7 @@ def unbundle_(errors, unbundled=[]):
|
|||||||
[{'error_field':'keya', 'error_description':['keyb']}, {'error_field':'keyc12', 'error_description':[12]}]
|
[{'error_field':'keya', 'error_description':['keyb']}, {'error_field':'keyc12', 'error_description':[12]}]
|
||||||
As can be seen, only the subkeys and their respective value are received. Each value is *always* a list.
|
As can be seen, only the subkeys and their respective value are received. Each value is *always* a list.
|
||||||
"""
|
"""
|
||||||
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description": str(v[0] if isinstance(v,list) else v)}) for k,v in errors.items()}
|
{k:unbundle_(v,unbundled=unbundled) if isinstance(v,dict) else unbundled.append({"error_field":k, "error_description":v[0] if isinstance(v,list) else str(v)}) for k,v in errors.items()}
|
||||||
return
|
return
|
||||||
|
|
||||||
def unbundle_validation_error_message(message):
|
def unbundle_validation_error_message(message):
|
||||||
@ -41,8 +40,8 @@ def unbundle_validation_error_message(message):
|
|||||||
unbundled = []
|
unbundled = []
|
||||||
unbundle_(message, unbundled=unbundled)
|
unbundle_(message, unbundled=unbundled)
|
||||||
if len(unbundled)>0:
|
if len(unbundled)>0:
|
||||||
error_field = "ValidationError in the following field(s): " + " & ".join([str(unb["error_field"]) for unb in unbundled])
|
error_field = "ValidationError in the following field(s): " + " & ".join([unb["error_field"] for unb in unbundled])
|
||||||
error_description = " " . join([str(unb["error_description"]) for unb in unbundled])
|
error_description = " " . join([unb["error_description"] for unb in unbundled])
|
||||||
else:
|
else:
|
||||||
error_field = "ValidationError"
|
error_field = "ValidationError"
|
||||||
error_description = "unknown validation error"
|
error_description = "unknown validation error"
|
||||||
@ -54,28 +53,35 @@ def create_validation_error_response(ex:ValidationError, status_code:int=400, cr
|
|||||||
(error_field, error_description) = unbundle_validation_error_message(message)
|
(error_field, error_description) = unbundle_validation_error_message(message)
|
||||||
json_response = create_default_json_response_format(error_field=error_field, error_description=error_description)
|
json_response = create_default_json_response_format(error_field=error_field, error_description=error_description)
|
||||||
|
|
||||||
|
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
|
||||||
|
# natively serializable are properly serialized.
|
||||||
|
serialized_response = json.dumps(json_response, default=str)
|
||||||
|
|
||||||
if create_log:
|
if create_log:
|
||||||
logging.warning(ex) if ex is not None else logging.warning(message)
|
logging.warning(ex) if ex is not None else logging.warning(message)
|
||||||
# print(ex) if ex is not None else print(message)
|
# print(ex) if ex is not None else print(message)
|
||||||
return (jsonify(json_response), status_code)
|
return (serialized_response, status_code)
|
||||||
|
|
||||||
def create_werkzeug_error_response(ex:Forbidden, status_code:int=403, create_log:bool=True)->typing.Tuple[str,int]:
|
def create_werkzeug_error_response(ex:Forbidden, status_code:int=403, create_log:bool=True)->typing.Tuple[str,int]:
|
||||||
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
|
# json.dumps with default=str automatically converts non-serializable values to strings. Hence, datetime objects (which are not)
|
||||||
# natively serializable are properly serialized.
|
# natively serializable are properly serialized.
|
||||||
message = ex.description
|
message = ex.description
|
||||||
json_response = create_default_json_response_format(error_field=str(repr(ex)), error_description=message)
|
json_response = create_default_json_response_format(error_field=str(repr(ex)), error_description=message)
|
||||||
|
serialized_response = json.dumps(json_response, default=str)
|
||||||
|
|
||||||
if create_log:
|
if create_log:
|
||||||
logging.warning(ex) if ex is not None else logging.warning(message)
|
logging.warning(ex) if ex is not None else logging.warning(message)
|
||||||
# print(ex) if ex is not None else print(message)
|
# print(ex) if ex is not None else print(message)
|
||||||
return jsonify(json_response), status_code
|
return serialized_response, status_code
|
||||||
|
|
||||||
def create_dynamic_exception_response(ex, status_code:int=400, message:typing.Optional[str]=None, create_log:bool=True):
|
def create_dynamic_exception_response(ex, status_code:int=400, message:typing.Optional[str]=None, create_log:bool=True):
|
||||||
message = repr(ex) if message is None else message
|
message = repr(ex) if message is None else message
|
||||||
json_response = create_default_json_response_format(error_field="Exception", error_description=message)
|
json_response = create_default_json_response_format(error_field="Exception", error_description=message)
|
||||||
json_response["error_field"] = "call failed"
|
json_response["error_field"] = "call failed"
|
||||||
|
|
||||||
|
serialized_response = json.dumps(json_response, default=str)
|
||||||
|
|
||||||
if create_log:
|
if create_log:
|
||||||
logging.warning(ex) if ex is not None else logging.warning(message)
|
logging.warning(ex) if ex is not None else logging.warning(message)
|
||||||
# print(ex) if ex is not None else print(message)
|
# print(ex) if ex is not None else print(message)
|
||||||
return (jsonify(json_response), status_code)
|
return (serialized_response, status_code)
|
||||||
|
|||||||
@ -7,7 +7,7 @@ def base_url() -> str:
|
|||||||
# Example: https://dev.api.mycompany.com
|
# Example: https://dev.api.mycompany.com
|
||||||
url = os.environ.get("API_BASE_URL")
|
url = os.environ.get("API_BASE_URL")
|
||||||
if not url:
|
if not url:
|
||||||
url = "http://127.0.0.1:5000"
|
url = "http://neptun.fritz.box"
|
||||||
# raise RuntimeError("Set API_BASE_URL")
|
# raise RuntimeError("Set API_BASE_URL")
|
||||||
return url.rstrip("/")
|
return url.rstrip("/")
|
||||||
|
|
||||||
|
|||||||
@ -1,15 +1,6 @@
|
|||||||
import os
|
|
||||||
import pytest
|
|
||||||
import schemathesis
|
import schemathesis
|
||||||
|
|
||||||
schema = schemathesis.openapi.from_path(
|
schema = schemathesis.openapi.from_path("../../../misc/BreCalApi.yaml")
|
||||||
"../../../misc/BreCalApi.yaml",
|
|
||||||
)
|
|
||||||
schema.base_url = (os.environ.get("API_BASE_URL") or "http://127.0.0.1:5000").rstrip("/")
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session", autouse=True)
|
|
||||||
def _set_schema_base_url(base_url: str) -> None:
|
|
||||||
schema.base_url = base_url
|
|
||||||
|
|
||||||
@schema.parametrize()
|
@schema.parametrize()
|
||||||
def test_api_conformance(
|
def test_api_conformance(
|
||||||
@ -18,13 +9,9 @@ def test_api_conformance(
|
|||||||
auth_headers: dict[str, str],
|
auth_headers: dict[str, str],
|
||||||
login_payload: dict[str, str],
|
login_payload: dict[str, str],
|
||||||
) -> None:
|
) -> None:
|
||||||
case.operation.schema.base_url = base_url
|
|
||||||
transport = getattr(case.operation.schema, "transport", None)
|
|
||||||
if transport is not None and hasattr(transport, "base_url"):
|
|
||||||
transport.base_url = base_url
|
|
||||||
# Calls your real service:
|
# Calls your real service:
|
||||||
if case.path == "/login" and case.method.upper() == "POST":
|
if case.path == "/login" and case.method.upper() == "POST":
|
||||||
response = case.call(base_url=base_url)
|
response = case.call(base_url=base_url, json=login_payload)
|
||||||
else:
|
else:
|
||||||
response = case.call(base_url=base_url, headers=auth_headers)
|
response = case.call(base_url=base_url, headers=auth_headers)
|
||||||
# Validates status code, headers, and body against the OpenAPI schema:
|
# Validates status code, headers, and body against the OpenAPI schema:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user