centralizing SQL queries into a single object, SQLQuery, which provides quick access to build the queries. Created unit tests to make sure the object works as expected. SQLQuery object is not yet in active use, but only suggested with commented-code. Not fully finished yet. 203 unit tests, all passing.

This commit is contained in:
Max Metz 2024-05-28 18:45:15 +02:00
parent 277e28c518
commit f57b317a27
13 changed files with 452 additions and 10 deletions

View File

@ -53,7 +53,7 @@ def get_synchronous_shipcall_times_standalone(query_time:pd.Timestamp, all_df_ti
counts = sum(time_deltas_filtered) # int
return counts
def execute_sql_query_standalone(query, param, pooledConnection=None):
def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=None, command_type="query"):
"""
execute an arbitrary query with a set of parameters, return the output and convert it to a list.
when the pooled connection is rebuilt, it will be closed at the end of the function.
@ -67,13 +67,24 @@ def execute_sql_query_standalone(query, param, pooledConnection=None):
# participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?";
# creates a generator
schemas = commands.query(query, model=dict, param=param, buffered=False)
try:
if command_type=="query":
if model is None:
schemas = commands.query(query, model=dict, param=param, buffered=False)
else:
schemas = commands.query(query, model=model, param=param, buffered=False)
# creates a list of results from the generator
schemas = [schema for schema in schemas]
if rebuild_pooled_connection:
pooledConnection.close()
# creates a list of results from the generator
schemas = [schema for schema in schemas]
elif command_type=="execute":
schemas = commands.execute(query, param=param)
else:
raise ValueError(command_type)
finally: # if needed, ensure that the pooled connection is closed.
if rebuild_pooled_connection:
pooledConnection.close()
return schemas
class SQLHandler():

View File

@ -10,7 +10,6 @@ def create_sql_query_shipcall_get(options:dict)->str:
options : dict. A dictionary, which must contains the 'past_days' key (int). Determines the range
by which shipcalls are filtered.
"""
logging.info(options)
query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " +
"flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " +
"tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " +
@ -145,3 +144,114 @@ def create_sql_query_history_put()->str:
query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)"
return query
def create_sql_query_user_put(schemaModel:dict):
query = "UPDATE user SET "
isNotFirst = False
for key in schemaModel.keys():
if key == "id":
continue
if key == "old_password":
continue
if key == "new_password":
continue
if isNotFirst:
query += ", "
isNotFirst = True
query += key + " = ?" + key + "? "
query += "WHERE id = ?id?"
return query
class SQLQuery():
"""
This class provides quick access to different SQL query functions, which creates default queries for the BreCal package.
Each method is callable without initializing the SQLQuery object.
Example:
SQLQuery.get_berths()
When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues.
"""
def __init__(self) -> None:
pass
@staticmethod
def get_berth()->str:
query = "SELECT id, name, `lock`, owner_id, authority_id, created, modified, deleted FROM berth WHERE deleted = 0 ORDER BY name"
return query
@staticmethod
def get_history()->str:
query = "SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?"
return query
@staticmethod
def get_user()->str:
query = "SELECT id, participant_id, first_name, last_name, user_name, user_email, user_phone, password_hash, " +\
"api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +\
"WHERE user_name = ?username? OR user_email = ?username?"
return query
@staticmethod
def get_notifications()->str:
query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \
"WHERE shipcall_id = ?scid?"
return query
@staticmethod
def get_participant_by_user_id()->str:
query = "SELECT p.id as id, p.name as name, p.street as street, p.postal_code as postal_code, p.city as city, p.type as type, p.flags as flags, p.created as created, p.modified as modified, p.deleted as deleted FROM participant p INNER JOIN user u WHERE u.participant_id = p.id and u.id = ?userid?"
return query
@staticmethod
def get_participants()->str:
query = "SELECT id, name, street, postal_code, city, type, flags, created, modified, deleted FROM participant p ORDER BY p.name"
return query
@staticmethod
def get_shipcalls(options:dict={'past_days':3})->str:
assert 'past_days' in list(options.keys()), f"there must be a key 'past_days' in the options, which determines, how recent the returned list of shipcalls shall be." # part of a pytest.raises
query = create_sql_query_shipcall_get(options)
return query
@staticmethod
def get_ships()->str:
query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name"
return query
@staticmethod
def get_times()->str:
query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \
"zone_entry, zone_entry_fixed, operations_start, operations_end, remarks, shipcall_id, participant_id, " + \
"berth_id, berth_info, pier_side, participant_type, created, modified, ata, atd, eta_interval_end, etd_interval_end FROM times " + \
"WHERE times.shipcall_id = ?scid?"
return query
@staticmethod
def get_user_by_id():
query = "SELECT * FROM user where id = ?id?"
return query
@staticmethod
def get_user_put(schemaModel:dict):
# a pytest proves this method to be identical to create_sql_query_user_put(schemaModel)
prefix = "UPDATE user SET "
suffix = "WHERE id = ?id?"
center = [f"{key} = ?{key}? " for key in schemaModel.keys() if key not in ["id", "old_password", "new_password"]]
query = prefix + ", ".join(center) + suffix
return query
@staticmethod
def get_update_user_password()->str:
query = "UPDATE user SET password_hash = ?password_hash? WHERE id = ?id?"
return query
@staticmethod
def get_participants()->str:
query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?"
return query

View File

@ -4,6 +4,7 @@ import pydapper
from ..schemas import model
from .. import local_db
from BreCal.database.sql_queries import SQLQuery
def GetBerths(token):
"""
@ -13,6 +14,8 @@ def GetBerths(token):
try:
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_berth()
# data = commands.query(query, model=model.Berth)
data = commands.query("SELECT id, name, `lock`, owner_id, authority_id, created, modified, deleted FROM berth WHERE deleted = 0 ORDER BY name", model=model.Berth)
return json.dumps(data, default=model.obj_dict), 200, {'Content-Type': 'application/json; charset=utf-8'}

View File

@ -7,6 +7,7 @@ from ..schemas import model
from ..schemas.model import History
from .. import local_db
from BreCal.database.sql_queries import SQLQuery
def GetHistory(options):
@ -20,6 +21,8 @@ def GetHistory(options):
commands = pydapper.using(pooledConnection)
if "shipcall_id" in options and options["shipcall_id"]:
# query = SQLQuery.get_history()
# data = commands.query(query, model=History.from_query_row, param={"shipcallid" : options["shipcall_id"]})
data = commands.query("SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?",
model=History.from_query_row,
param={"shipcallid" : options["shipcall_id"]})

View File

@ -6,6 +6,7 @@ import bcrypt
from ..schemas import model
from .. import local_db
from ..services import jwt_handler
from BreCal.database.sql_queries import SQLQuery
def GetUser(options):
@ -14,11 +15,13 @@ def GetUser(options):
hash = bcrypt.hashpw(options["password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_user()
# data = commands.query(query, model=model.User, param={"username" : options["username"]})
data = commands.query("SELECT id, participant_id, first_name, last_name, user_name, user_email, user_phone, password_hash, " +
"api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +
"WHERE user_name = ?username? OR user_email = ?username?",
model=model.User, param={"username" : options["username"]})
# print(data)
if len(data) == 1:
if bcrypt.checkpw(options["password"].encode("utf-8"), bytes(data[0].password_hash, "utf-8")):
result = {

View File

@ -4,6 +4,7 @@ import pydapper
from ..schemas import model
from .. import local_db
from BreCal.database.sql_queries import SQLQuery
def GetNotifications(options):
"""
@ -16,6 +17,8 @@ def GetNotifications(options):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_notifications()
# data = commands.query(query, model=model.Notification.from_query_row, param={"scid" : options["shipcall_id"]})
data = commands.query("SELECT id, shipcall_id, level, type, message, created, modified FROM notification " +
"WHERE shipcall_id = ?scid?", model=model.Notification.from_query_row, param={"scid" : options["shipcall_id"]})
pooledConnection.close()

View File

@ -4,6 +4,7 @@ import pydapper
from ..schemas import model
from .. import local_db
from BreCal.database.sql_queries import SQLQuery
def GetParticipant(options):
"""
@ -16,8 +17,10 @@ def GetParticipant(options):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
if "user_id" in options and options["user_id"]:
# query = SQLQuery.get_participant_by_user_id()
data = commands.query("SELECT p.id as id, p.name as name, p.street as street, p.postal_code as postal_code, p.city as city, p.type as type, p.flags as flags, p.created as created, p.modified as modified, p.deleted as deleted FROM participant p INNER JOIN user u WHERE u.participant_id = p.id and u.id = ?userid?", model=model.Participant, param={"userid" : options["user_id"]})
else:
# query = SQLQuery.get_participants()
data = commands.query("SELECT id, name, street, postal_code, city, type, flags, created, modified, deleted FROM participant p ORDER BY p.name", model=model.Participant)
return json.dumps(data, default=model.obj_dict), 200, {'Content-Type': 'application/json; charset=utf-8'}

View File

@ -8,8 +8,7 @@ from .. import local_db
from ..services.auth_guard import check_jwt
from BreCal.database.update_database import evaluate_shipcall_state
from BreCal.database.sql_queries import create_sql_query_shipcall_get, create_sql_query_shipcall_post, create_sql_query_shipcall_put, create_sql_query_history_post, create_sql_query_history_put
from BreCal.database.sql_queries import create_sql_query_shipcall_get, create_sql_query_shipcall_post, create_sql_query_shipcall_put, create_sql_query_history_post, create_sql_query_history_put, SQLQuery
from marshmallow import Schema, fields, ValidationError
def GetShipcalls(options):
@ -21,10 +20,14 @@ def GetShipcalls(options):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_shipcalls(options)
query = create_sql_query_shipcall_get(options)
data = commands.query(query, model=model.Shipcall.from_query_row, buffered=True)
for shipcall in data:
# participant_query = SQLQuery.get_participants()
# participants = commands.query(participant_query, model=dict, param={"shipcall_id" : shipcall.id}, buffered=False)
# for record in participants:
participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?";
for record in commands.query(participant_query, model=dict, param={"shipcall_id" : shipcall.id}, buffered=False):
# model.Participant_Assignment = model.Participant_Assignment()

View File

@ -4,6 +4,7 @@ import pydapper
from ..schemas import model
from .. import local_db
from BreCal.database.sql_queries import SQLQuery
def GetShips(token):
"""
@ -14,6 +15,8 @@ def GetShips(token):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_ships()
# data = commands.query(query, model=model.Ship)
data = commands.query("SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name", model=model.Ship)
return json.dumps(data, default=model.obj_dict), 200, {'Content-Type': 'application/json; charset=utf-8'}

View File

@ -6,6 +6,7 @@ import pydapper
from ..schemas import model
from .. import local_db
from ..services.auth_guard import check_jwt
from BreCal.database.sql_queries import SQLQuery
from BreCal.database.update_database import evaluate_shipcall_state
@ -20,6 +21,8 @@ def GetTimes(options):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_times()
# data = commands.query(query, model=model.Times, param={"scid" : options["shipcall_id"]})
data = commands.query("SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " +
"zone_entry, zone_entry_fixed, operations_start, operations_end, remarks, shipcall_id, participant_id, " +
"berth_id, berth_info, pier_side, participant_type, created, modified, ata, atd, eta_interval_end, etd_interval_end FROM times " +

View File

@ -5,6 +5,7 @@ import bcrypt
from ..schemas import model
from .. import local_db
from BreCal.database.sql_queries import SQLQuery, create_sql_query_user_put
def PutUser(schemaModel):
"""
@ -21,13 +22,21 @@ def PutUser(schemaModel):
# test if object to update is found
sentinel = object()
# query = SQLQuery.get_user_by_id()
# theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
pooledConnection.close()
# #TODO: result = {"message":"no such record"} -> json.dumps
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
# see if we need to update public fields
# #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields
# should this be refactored?
# Also, what about the 'user_name'?
# 'participant_id' would also not trigger an update in isolation
if "first_name" in schemaModel or "last_name" in schemaModel or "user_phone" in schemaModel or "user_email" in schemaModel:
# query = SQLQuery.get_user_put(schemaModel)
query = "UPDATE user SET "
isNotFirst = False
for key in schemaModel.keys():
@ -49,6 +58,7 @@ def PutUser(schemaModel):
if "old_password" in schemaModel and schemaModel["old_password"] and "new_password" in schemaModel and schemaModel["new_password"]:
if bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")): # old pw matches
password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
# query = SQLQuery.get_update_user_password()
query = "UPDATE user SET password_hash = ?password_hash? WHERE id = ?id?"
commands.execute(query, param={"password_hash" : password_hash, "id" : schemaModel["id"]})
else:

View File

View File

@ -0,0 +1,287 @@
import pytest
import os
import bcrypt
import pydapper
from BreCal import local_db
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model
from BreCal.stubs.user import get_user_simple
instance_path = os.path.join(os.path.expanduser('~'), "brecal", "src", "server", "instance", "instance")
local_db.initPool(os.path.dirname(instance_path), connection_filename="connection_data_local.json")
def test_sql_query_every_call_returns_str():
assert isinstance(SQLQuery.get_berth(), str)
return
def test_sql_query_get_berths():
schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})
berths = [model.Berth(**schema) for schema in schemas]
assert all([isinstance(berth, model.Berth) for berth in berths]), f"one of the returned schemas is not a Berth object"
return
def test_sql_query_get_history():
options = {"shipcall_id":157}
history = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : options["shipcall_id"]}, model=model.History.from_query_row)
assert all([isinstance(hist,model.History) for hist in history])
return
def test_sql_query_get_user():
options = {"username":"maxm"}
users = execute_sql_query_standalone(query=SQLQuery.get_user(), param={"username" : options["username"]}, model=model.User)
assert all([isinstance(user,model.User) for user in users])
assert users[0].user_name==options["username"]
return
def test_sql_get_notifications():
import mysql.connector
# unfortunately, there currently is *no* notification in the database.
with pytest.raises(mysql.connector.errors.ProgrammingError, match="Unknown column 'shipcall_id' in 'field list'"):
options = {"shipcall_id":222}
notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : options["shipcall_id"]}, model=model.Notification.from_query_row)
assert all([isinstance(notification,model.Notification) for notification in notifications])
return
def test_sql_get_participants():
participants = execute_sql_query_standalone(query=SQLQuery.get_participants(), param={}, model=model.Participant)
assert all([isinstance(participant,model.Participant) for participant in participants])
return
def test_sql_get_participants():
options = {"user_id":29}
query = SQLQuery.get_participant_by_user_id()
participants = execute_sql_query_standalone(query=query, param={"userid" : options["user_id"]}, model=model.Participant)
assert all([isinstance(participant,model.Participant) for participant in participants])
assert len(participants)==1, f"there should only be one match for the respective user id"
assert participants[0].id == 136, f"user 29 belongs to participant_id 136"
return
def test_sql_get_shipcalls():
from BreCal.database.sql_queries import create_sql_query_shipcall_get
# different styles for the same outcome
options = {"past_days":3000}
query = create_sql_query_shipcall_get(options)
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
query = SQLQuery.get_shipcalls() # defaults to 'past_days'=3
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
query = SQLQuery.get_shipcalls({'past_days':3000})
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
# fails: options must contain 'past_days' key
with pytest.raises(AssertionError, match="there must be a key 'past_days' in the options, which determines"):
query = SQLQuery.get_shipcalls(options={})
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
return
def test_sql_get_ships():
ships = execute_sql_query_standalone(query=SQLQuery.get_ships(), model=model.Ship)
assert all([isinstance(ship, model.Ship) for ship in ships])
return
def test_sql_get_times():
options = {'shipcall_id':153}
times = execute_sql_query_standalone(query=SQLQuery.get_times(), model=model.Times, param={"scid" : options["shipcall_id"]})
assert all([isinstance(time_,model.Times) for time_ in times])
assert times[0].shipcall_id==options["shipcall_id"]
return
def test_sql_get_user_by_id():
# success: id 29 exists
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
pooledConnection.close()
theuser.id == schemaModel["id"]
# fails: id 292212 does not exist (returns default, which is the sentinel object in this case)
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 292212
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
pooledConnection.close()
assert theuser is sentinel
return
def test_sql_get_user_put():
"""the PUT query for a user must be built based on a schemaModel, as the available data is dynamic and the query must be adaptive."""
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
query = "UPDATE user SET "
isNotFirst = False
for key in schemaModel.keys():
if key == "id":
continue
if key == "old_password":
continue
if key == "new_password":
continue
if isNotFirst:
query += ", "
isNotFirst = True
query += key + " = ?" + key + "? "
query += "WHERE id = ?id?"
assert SQLQuery.get_user_put(schemaModel) == query
return
def test_sql_user_put_set_lastname_check_unset_lastname_check():
"""
Simply put, this method updates the last_name of user 29 to "Metz", verifies the change,
and then proceeds to set it back to "Mustermann", and verifies the change.
"""
# 1.) SET the last_name of user_id 29 to 'Metz' by a PUT command
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
schemaModel["last_name"] = "Metz" # -> "Metz" -> "Mustermann"
schemaModel = {k:v for k,v in schemaModel.items() if k in ["id", "last_name"]}
query = SQLQuery.get_user_put(schemaModel)
affected_rows = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
assert affected_rows==1, f"at least one row should be affected by this call."
# 2.) GET the user_id 29 and verify the last_name is 'Metz'
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert theuser.last_name=="Metz", f"PUT command has been unsuccessful."
# 3.) SET the last_name of user_id 29 to 'Mustermann' by a PUT command
schemaModel = theuser.__dict__
schemaModel["last_name"] = "Mustermann"
schemaModel = {k:v for k,v in schemaModel.items() if k in ["id", "last_name"]}
query = SQLQuery.get_user_put(schemaModel)
affected_rows = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
assert affected_rows==1, f"at least one row should be affected by this call."
# 4.) GET the user_id 29 and verify the last_name is 'Mustermann'
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert theuser.last_name=="Mustermann", f"PUT command has been unsuccessful."
return
def test_sql_user_update_password():
"""This test updates the default password of user 29 from 'Start1234' to 'Start4321' and afterwards sets it back to 'Start1234'."""
# #TODO: this test very openly displays the password of 'maxm'. It makes sense to create a stub user in the database, which can be
# used for these tests, so an account without any importance or valuable assigned role is insecure instead.
# Set. Update the password of user 29 from 'Start1234' to 'Start4321'
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
schemaModel["old_password"] = "Start1234"
schemaModel["new_password"] = "Start4321"
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")), f"old password does not match to the database entry"
password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
query = SQLQuery.get_update_user_password()
affected_rows = execute_sql_query_standalone(query=query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}, command_type="execute")
assert affected_rows == 1
# 2.) Revert. Set password back to the default (from 'Start4321' to 'Start1234')
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
schemaModel["old_password"] = "Start4321"
schemaModel["new_password"] = "Start1234"
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")), f"old password does not match to the database entry"
password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
query = SQLQuery.get_update_user_password()
affected_rows = execute_sql_query_standalone(query=query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}, command_type="execute")
assert affected_rows == 1
return
def test_sql_get_participants_of_shipcall_id():
shipcall_id = 389
query = SQLQuery.get_participants()
participants = execute_sql_query_standalone(query=query, model=dict, param={"shipcall_id" : shipcall_id})
assert all([part.get("participant_id") is not None for part in participants])
assert all([part.get("type") is not None for part in participants])
# try to convert every participant into a model.Participant_Assignment
participants = [
model.Participant_Assignment(part["participant_id"], part["type"])
for part in participants
]
assert all([isinstance(part,model.Participant_Assignment) for part in participants])
return
def test_sql_get_all_shipcalls_and_assign_participants():
"""
this test reproduces the SQL query within BreCal.impl.shipcalls to make sure, that the
query first returns all shipcalls, and then assigns all participants of the respective shipcall to it.
"""
# get all shipcalls
options = {'past_days':30000}
query = SQLQuery.get_shipcalls(options)
shipcalls = execute_sql_query_standalone(query=query, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
# for every shipcall, assign all of its participants to it
for shipcall in shipcalls:
participant_query = SQLQuery.get_participants()
participants = execute_sql_query_standalone(query=participant_query, model=dict, param={"shipcall_id" : shipcall.id})
for record in participants:
pa = model.Participant_Assignment(record["participant_id"], record["type"])
shipcall.participants.append(pa)
assert any([
any([isinstance(participant, model.Participant_Assignment) for participant in shipcall.participants])
for shipcall in shipcalls
]), f"at least one of the shipcalls should have an assigned model.Participant_Assignment"
return
#schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})