diff --git a/src/server/BreCal/database/sql_handler.py b/src/server/BreCal/database/sql_handler.py index 4166480..9b1e086 100644 --- a/src/server/BreCal/database/sql_handler.py +++ b/src/server/BreCal/database/sql_handler.py @@ -53,7 +53,7 @@ def get_synchronous_shipcall_times_standalone(query_time:pd.Timestamp, all_df_ti counts = sum(time_deltas_filtered) # int return counts -def execute_sql_query_standalone(query, param, pooledConnection=None): +def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=None, command_type="query"): """ execute an arbitrary query with a set of parameters, return the output and convert it to a list. when the pooled connection is rebuilt, it will be closed at the end of the function. @@ -67,13 +67,24 @@ def execute_sql_query_standalone(query, param, pooledConnection=None): # participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?"; # creates a generator - schemas = commands.query(query, model=dict, param=param, buffered=False) + try: + if command_type=="query": + if model is None: + schemas = commands.query(query, model=dict, param=param, buffered=False) + else: + schemas = commands.query(query, model=model, param=param, buffered=False) - # creates a list of results from the generator - schemas = [schema for schema in schemas] - if rebuild_pooled_connection: - pooledConnection.close() + # creates a list of results from the generator + schemas = [schema for schema in schemas] + elif command_type=="execute": + schemas = commands.execute(query, param=param) + else: + raise ValueError(command_type) + + finally: # if needed, ensure that the pooled connection is closed. + if rebuild_pooled_connection: + pooledConnection.close() return schemas class SQLHandler(): diff --git a/src/server/BreCal/database/sql_queries.py b/src/server/BreCal/database/sql_queries.py index 55ddf90..e69ea87 100644 --- a/src/server/BreCal/database/sql_queries.py +++ b/src/server/BreCal/database/sql_queries.py @@ -10,7 +10,6 @@ def create_sql_query_shipcall_get(options:dict)->str: options : dict. A dictionary, which must contains the 'past_days' key (int). Determines the range by which shipcalls are filtered. """ - logging.info(options) query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + "flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + "tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + @@ -145,3 +144,114 @@ def create_sql_query_history_put()->str: query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)" return query +def create_sql_query_user_put(schemaModel:dict): + query = "UPDATE user SET " + isNotFirst = False + for key in schemaModel.keys(): + if key == "id": + continue + if key == "old_password": + continue + if key == "new_password": + continue + if isNotFirst: + query += ", " + isNotFirst = True + query += key + " = ?" + key + "? " + + query += "WHERE id = ?id?" + return query + + + + +class SQLQuery(): + """ + This class provides quick access to different SQL query functions, which creates default queries for the BreCal package. + Each method is callable without initializing the SQLQuery object. + + Example: + SQLQuery.get_berths() + + When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues. + """ + def __init__(self) -> None: + pass + + @staticmethod + def get_berth()->str: + query = "SELECT id, name, `lock`, owner_id, authority_id, created, modified, deleted FROM berth WHERE deleted = 0 ORDER BY name" + return query + + @staticmethod + def get_history()->str: + query = "SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?" + return query + + @staticmethod + def get_user()->str: + query = "SELECT id, participant_id, first_name, last_name, user_name, user_email, user_phone, password_hash, " +\ + "api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +\ + "WHERE user_name = ?username? OR user_email = ?username?" + return query + + @staticmethod + def get_notifications()->str: + query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \ + "WHERE shipcall_id = ?scid?" + return query + + @staticmethod + def get_participant_by_user_id()->str: + query = "SELECT p.id as id, p.name as name, p.street as street, p.postal_code as postal_code, p.city as city, p.type as type, p.flags as flags, p.created as created, p.modified as modified, p.deleted as deleted FROM participant p INNER JOIN user u WHERE u.participant_id = p.id and u.id = ?userid?" + return query + + @staticmethod + def get_participants()->str: + query = "SELECT id, name, street, postal_code, city, type, flags, created, modified, deleted FROM participant p ORDER BY p.name" + return query + + @staticmethod + def get_shipcalls(options:dict={'past_days':3})->str: + assert 'past_days' in list(options.keys()), f"there must be a key 'past_days' in the options, which determines, how recent the returned list of shipcalls shall be." # part of a pytest.raises + query = create_sql_query_shipcall_get(options) + return query + + @staticmethod + def get_ships()->str: + query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name" + return query + + @staticmethod + def get_times()->str: + query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \ + "zone_entry, zone_entry_fixed, operations_start, operations_end, remarks, shipcall_id, participant_id, " + \ + "berth_id, berth_info, pier_side, participant_type, created, modified, ata, atd, eta_interval_end, etd_interval_end FROM times " + \ + "WHERE times.shipcall_id = ?scid?" + return query + + @staticmethod + def get_user_by_id(): + query = "SELECT * FROM user where id = ?id?" + return query + + @staticmethod + def get_user_put(schemaModel:dict): + # a pytest proves this method to be identical to create_sql_query_user_put(schemaModel) + prefix = "UPDATE user SET " + suffix = "WHERE id = ?id?" + center = [f"{key} = ?{key}? " for key in schemaModel.keys() if key not in ["id", "old_password", "new_password"]] + + query = prefix + ", ".join(center) + suffix + return query + + @staticmethod + def get_update_user_password()->str: + query = "UPDATE user SET password_hash = ?password_hash? WHERE id = ?id?" + return query + + @staticmethod + def get_participants()->str: + query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?" + return query + \ No newline at end of file diff --git a/src/server/BreCal/impl/berths.py b/src/server/BreCal/impl/berths.py index 9087856..0ed2ca9 100644 --- a/src/server/BreCal/impl/berths.py +++ b/src/server/BreCal/impl/berths.py @@ -4,6 +4,7 @@ import pydapper from ..schemas import model from .. import local_db +from BreCal.database.sql_queries import SQLQuery def GetBerths(token): """ @@ -13,6 +14,8 @@ def GetBerths(token): try: pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_berth() + # data = commands.query(query, model=model.Berth) data = commands.query("SELECT id, name, `lock`, owner_id, authority_id, created, modified, deleted FROM berth WHERE deleted = 0 ORDER BY name", model=model.Berth) return json.dumps(data, default=model.obj_dict), 200, {'Content-Type': 'application/json; charset=utf-8'} diff --git a/src/server/BreCal/impl/history.py b/src/server/BreCal/impl/history.py index f408001..53f159c 100644 --- a/src/server/BreCal/impl/history.py +++ b/src/server/BreCal/impl/history.py @@ -7,6 +7,7 @@ from ..schemas import model from ..schemas.model import History from .. import local_db +from BreCal.database.sql_queries import SQLQuery def GetHistory(options): @@ -20,6 +21,8 @@ def GetHistory(options): commands = pydapper.using(pooledConnection) if "shipcall_id" in options and options["shipcall_id"]: + # query = SQLQuery.get_history() + # data = commands.query(query, model=History.from_query_row, param={"shipcallid" : options["shipcall_id"]}) data = commands.query("SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?", model=History.from_query_row, param={"shipcallid" : options["shipcall_id"]}) diff --git a/src/server/BreCal/impl/login.py b/src/server/BreCal/impl/login.py index 21b46a6..66e709b 100644 --- a/src/server/BreCal/impl/login.py +++ b/src/server/BreCal/impl/login.py @@ -6,6 +6,7 @@ import bcrypt from ..schemas import model from .. import local_db from ..services import jwt_handler +from BreCal.database.sql_queries import SQLQuery def GetUser(options): @@ -14,11 +15,13 @@ def GetUser(options): hash = bcrypt.hashpw(options["password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8') pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_user() + # data = commands.query(query, model=model.User, param={"username" : options["username"]}) data = commands.query("SELECT id, participant_id, first_name, last_name, user_name, user_email, user_phone, password_hash, " + "api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " + "WHERE user_name = ?username? OR user_email = ?username?", model=model.User, param={"username" : options["username"]}) - # print(data) + if len(data) == 1: if bcrypt.checkpw(options["password"].encode("utf-8"), bytes(data[0].password_hash, "utf-8")): result = { diff --git a/src/server/BreCal/impl/notifications.py b/src/server/BreCal/impl/notifications.py index 67619ab..2f0014b 100644 --- a/src/server/BreCal/impl/notifications.py +++ b/src/server/BreCal/impl/notifications.py @@ -4,6 +4,7 @@ import pydapper from ..schemas import model from .. import local_db +from BreCal.database.sql_queries import SQLQuery def GetNotifications(options): """ @@ -16,6 +17,8 @@ def GetNotifications(options): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_notifications() + # data = commands.query(query, model=model.Notification.from_query_row, param={"scid" : options["shipcall_id"]}) data = commands.query("SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + "WHERE shipcall_id = ?scid?", model=model.Notification.from_query_row, param={"scid" : options["shipcall_id"]}) pooledConnection.close() diff --git a/src/server/BreCal/impl/participant.py b/src/server/BreCal/impl/participant.py index aad0def..7f9cc48 100644 --- a/src/server/BreCal/impl/participant.py +++ b/src/server/BreCal/impl/participant.py @@ -4,6 +4,7 @@ import pydapper from ..schemas import model from .. import local_db +from BreCal.database.sql_queries import SQLQuery def GetParticipant(options): """ @@ -16,8 +17,10 @@ def GetParticipant(options): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) if "user_id" in options and options["user_id"]: + # query = SQLQuery.get_participant_by_user_id() data = commands.query("SELECT p.id as id, p.name as name, p.street as street, p.postal_code as postal_code, p.city as city, p.type as type, p.flags as flags, p.created as created, p.modified as modified, p.deleted as deleted FROM participant p INNER JOIN user u WHERE u.participant_id = p.id and u.id = ?userid?", model=model.Participant, param={"userid" : options["user_id"]}) else: + # query = SQLQuery.get_participants() data = commands.query("SELECT id, name, street, postal_code, city, type, flags, created, modified, deleted FROM participant p ORDER BY p.name", model=model.Participant) return json.dumps(data, default=model.obj_dict), 200, {'Content-Type': 'application/json; charset=utf-8'} diff --git a/src/server/BreCal/impl/shipcalls.py b/src/server/BreCal/impl/shipcalls.py index c16675a..bb3a9e4 100644 --- a/src/server/BreCal/impl/shipcalls.py +++ b/src/server/BreCal/impl/shipcalls.py @@ -8,8 +8,7 @@ from .. import local_db from ..services.auth_guard import check_jwt from BreCal.database.update_database import evaluate_shipcall_state -from BreCal.database.sql_queries import create_sql_query_shipcall_get, create_sql_query_shipcall_post, create_sql_query_shipcall_put, create_sql_query_history_post, create_sql_query_history_put - +from BreCal.database.sql_queries import create_sql_query_shipcall_get, create_sql_query_shipcall_post, create_sql_query_shipcall_put, create_sql_query_history_post, create_sql_query_history_put, SQLQuery from marshmallow import Schema, fields, ValidationError def GetShipcalls(options): @@ -21,10 +20,14 @@ def GetShipcalls(options): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_shipcalls(options) query = create_sql_query_shipcall_get(options) data = commands.query(query, model=model.Shipcall.from_query_row, buffered=True) for shipcall in data: + # participant_query = SQLQuery.get_participants() + # participants = commands.query(participant_query, model=dict, param={"shipcall_id" : shipcall.id}, buffered=False) + # for record in participants: participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?"; for record in commands.query(participant_query, model=dict, param={"shipcall_id" : shipcall.id}, buffered=False): # model.Participant_Assignment = model.Participant_Assignment() diff --git a/src/server/BreCal/impl/ships.py b/src/server/BreCal/impl/ships.py index b0b0b94..d557fd8 100644 --- a/src/server/BreCal/impl/ships.py +++ b/src/server/BreCal/impl/ships.py @@ -4,6 +4,7 @@ import pydapper from ..schemas import model from .. import local_db +from BreCal.database.sql_queries import SQLQuery def GetShips(token): """ @@ -14,6 +15,8 @@ def GetShips(token): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_ships() + # data = commands.query(query, model=model.Ship) data = commands.query("SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name", model=model.Ship) return json.dumps(data, default=model.obj_dict), 200, {'Content-Type': 'application/json; charset=utf-8'} diff --git a/src/server/BreCal/impl/times.py b/src/server/BreCal/impl/times.py index 1f16574..8c9f0e8 100644 --- a/src/server/BreCal/impl/times.py +++ b/src/server/BreCal/impl/times.py @@ -6,6 +6,7 @@ import pydapper from ..schemas import model from .. import local_db from ..services.auth_guard import check_jwt +from BreCal.database.sql_queries import SQLQuery from BreCal.database.update_database import evaluate_shipcall_state @@ -20,6 +21,8 @@ def GetTimes(options): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_times() + # data = commands.query(query, model=model.Times, param={"scid" : options["shipcall_id"]}) data = commands.query("SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + "zone_entry, zone_entry_fixed, operations_start, operations_end, remarks, shipcall_id, participant_id, " + "berth_id, berth_info, pier_side, participant_type, created, modified, ata, atd, eta_interval_end, etd_interval_end FROM times " + diff --git a/src/server/BreCal/impl/user.py b/src/server/BreCal/impl/user.py index cd79f89..733998c 100644 --- a/src/server/BreCal/impl/user.py +++ b/src/server/BreCal/impl/user.py @@ -5,6 +5,7 @@ import bcrypt from ..schemas import model from .. import local_db +from BreCal.database.sql_queries import SQLQuery, create_sql_query_user_put def PutUser(schemaModel): """ @@ -21,13 +22,21 @@ def PutUser(schemaModel): # test if object to update is found sentinel = object() + # query = SQLQuery.get_user_by_id() + # theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) theuser = commands.query_single_or_default("SELECT * FROM user where id = ?id?", sentinel, param={"id" : schemaModel["id"]}, model=model.User) if theuser is sentinel: pooledConnection.close() + # #TODO: result = {"message":"no such record"} -> json.dumps return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} # see if we need to update public fields + # #TODO_determine: this filter blocks Put-Requests, which update the 'notify_email', 'notify_whatsapp', 'notify_signal', 'notify_popup' fields + # should this be refactored? + # Also, what about the 'user_name'? + # 'participant_id' would also not trigger an update in isolation if "first_name" in schemaModel or "last_name" in schemaModel or "user_phone" in schemaModel or "user_email" in schemaModel: + # query = SQLQuery.get_user_put(schemaModel) query = "UPDATE user SET " isNotFirst = False for key in schemaModel.keys(): @@ -49,6 +58,7 @@ def PutUser(schemaModel): if "old_password" in schemaModel and schemaModel["old_password"] and "new_password" in schemaModel and schemaModel["new_password"]: if bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")): # old pw matches password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8') + # query = SQLQuery.get_update_user_password() query = "UPDATE user SET password_hash = ?password_hash? WHERE id = ?id?" commands.execute(query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}) else: diff --git a/src/server/tests/database/__init__.py b/src/server/tests/database/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/server/tests/database/test_sql_queries.py b/src/server/tests/database/test_sql_queries.py new file mode 100644 index 0000000..97934f6 --- /dev/null +++ b/src/server/tests/database/test_sql_queries.py @@ -0,0 +1,287 @@ +import pytest + +import os +import bcrypt +import pydapper +from BreCal import local_db +from BreCal.database.sql_handler import execute_sql_query_standalone +from BreCal.database.sql_queries import SQLQuery +from BreCal.schemas import model +from BreCal.stubs.user import get_user_simple + +instance_path = os.path.join(os.path.expanduser('~'), "brecal", "src", "server", "instance", "instance") +local_db.initPool(os.path.dirname(instance_path), connection_filename="connection_data_local.json") + +def test_sql_query_every_call_returns_str(): + assert isinstance(SQLQuery.get_berth(), str) + return + +def test_sql_query_get_berths(): + schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={}) + berths = [model.Berth(**schema) for schema in schemas] + assert all([isinstance(berth, model.Berth) for berth in berths]), f"one of the returned schemas is not a Berth object" + return + +def test_sql_query_get_history(): + options = {"shipcall_id":157} + history = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : options["shipcall_id"]}, model=model.History.from_query_row) + assert all([isinstance(hist,model.History) for hist in history]) + return + +def test_sql_query_get_user(): + options = {"username":"maxm"} + users = execute_sql_query_standalone(query=SQLQuery.get_user(), param={"username" : options["username"]}, model=model.User) + assert all([isinstance(user,model.User) for user in users]) + assert users[0].user_name==options["username"] + return + +def test_sql_get_notifications(): + import mysql.connector + + # unfortunately, there currently is *no* notification in the database. + with pytest.raises(mysql.connector.errors.ProgrammingError, match="Unknown column 'shipcall_id' in 'field list'"): + options = {"shipcall_id":222} + notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : options["shipcall_id"]}, model=model.Notification.from_query_row) + assert all([isinstance(notification,model.Notification) for notification in notifications]) + return + +def test_sql_get_participants(): + participants = execute_sql_query_standalone(query=SQLQuery.get_participants(), param={}, model=model.Participant) + assert all([isinstance(participant,model.Participant) for participant in participants]) + return + +def test_sql_get_participants(): + options = {"user_id":29} + query = SQLQuery.get_participant_by_user_id() + participants = execute_sql_query_standalone(query=query, param={"userid" : options["user_id"]}, model=model.Participant) + assert all([isinstance(participant,model.Participant) for participant in participants]) + assert len(participants)==1, f"there should only be one match for the respective user id" + assert participants[0].id == 136, f"user 29 belongs to participant_id 136" + return + +def test_sql_get_shipcalls(): + from BreCal.database.sql_queries import create_sql_query_shipcall_get + + # different styles for the same outcome + options = {"past_days":3000} + query = create_sql_query_shipcall_get(options) + shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row) + assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls]) + + query = SQLQuery.get_shipcalls() # defaults to 'past_days'=3 + shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row) + assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls]) + + query = SQLQuery.get_shipcalls({'past_days':3000}) + shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row) + assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls]) + + # fails: options must contain 'past_days' key + with pytest.raises(AssertionError, match="there must be a key 'past_days' in the options, which determines"): + query = SQLQuery.get_shipcalls(options={}) + shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row) + return + +def test_sql_get_ships(): + ships = execute_sql_query_standalone(query=SQLQuery.get_ships(), model=model.Ship) + assert all([isinstance(ship, model.Ship) for ship in ships]) + return + +def test_sql_get_times(): + options = {'shipcall_id':153} + times = execute_sql_query_standalone(query=SQLQuery.get_times(), model=model.Times, param={"scid" : options["shipcall_id"]}) + assert all([isinstance(time_,model.Times) for time_ in times]) + assert times[0].shipcall_id==options["shipcall_id"] + return + +def test_sql_get_user_by_id(): + # success: id 29 exists + schemaModel = get_user_simple().__dict__ + schemaModel["id"] = 29 + + sentinel = object() + query = SQLQuery.get_user_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) + + if theuser is sentinel: + pooledConnection.close() + + theuser.id == schemaModel["id"] + + # fails: id 292212 does not exist (returns default, which is the sentinel object in this case) + schemaModel = get_user_simple().__dict__ + schemaModel["id"] = 292212 + + sentinel = object() + query = SQLQuery.get_user_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) + + if theuser is sentinel: + pooledConnection.close() + + assert theuser is sentinel + return + +def test_sql_get_user_put(): + """the PUT query for a user must be built based on a schemaModel, as the available data is dynamic and the query must be adaptive.""" + schemaModel = get_user_simple().__dict__ + schemaModel["id"] = 29 + + query = "UPDATE user SET " + isNotFirst = False + for key in schemaModel.keys(): + if key == "id": + continue + if key == "old_password": + continue + if key == "new_password": + continue + if isNotFirst: + query += ", " + isNotFirst = True + query += key + " = ?" + key + "? " + + query += "WHERE id = ?id?" + + assert SQLQuery.get_user_put(schemaModel) == query + return + +def test_sql_user_put_set_lastname_check_unset_lastname_check(): + """ + Simply put, this method updates the last_name of user 29 to "Metz", verifies the change, + and then proceeds to set it back to "Mustermann", and verifies the change. + """ + # 1.) SET the last_name of user_id 29 to 'Metz' by a PUT command + schemaModel = get_user_simple().__dict__ + schemaModel["id"] = 29 + schemaModel["last_name"] = "Metz" # -> "Metz" -> "Mustermann" + schemaModel = {k:v for k,v in schemaModel.items() if k in ["id", "last_name"]} + + query = SQLQuery.get_user_put(schemaModel) + affected_rows = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute") + assert affected_rows==1, f"at least one row should be affected by this call." + + # 2.) GET the user_id 29 and verify the last_name is 'Metz' + sentinel = object() + query = SQLQuery.get_user_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) + pooledConnection.close() + assert not theuser is sentinel, f"failed GET user query" + assert theuser.last_name=="Metz", f"PUT command has been unsuccessful." + + # 3.) SET the last_name of user_id 29 to 'Mustermann' by a PUT command + schemaModel = theuser.__dict__ + schemaModel["last_name"] = "Mustermann" + schemaModel = {k:v for k,v in schemaModel.items() if k in ["id", "last_name"]} + + query = SQLQuery.get_user_put(schemaModel) + affected_rows = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute") + assert affected_rows==1, f"at least one row should be affected by this call." + + # 4.) GET the user_id 29 and verify the last_name is 'Mustermann' + sentinel = object() + query = SQLQuery.get_user_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) + pooledConnection.close() + assert not theuser is sentinel, f"failed GET user query" + assert theuser.last_name=="Mustermann", f"PUT command has been unsuccessful." + return + +def test_sql_user_update_password(): + """This test updates the default password of user 29 from 'Start1234' to 'Start4321' and afterwards sets it back to 'Start1234'.""" + # #TODO: this test very openly displays the password of 'maxm'. It makes sense to create a stub user in the database, which can be + # used for these tests, so an account without any importance or valuable assigned role is insecure instead. + + # Set. Update the password of user 29 from 'Start1234' to 'Start4321' + schemaModel = get_user_simple().__dict__ + schemaModel["id"] = 29 + schemaModel["old_password"] = "Start1234" + schemaModel["new_password"] = "Start4321" + + sentinel = object() + query = SQLQuery.get_user_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) + pooledConnection.close() + assert not theuser is sentinel, f"failed GET user query" + + assert bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")), f"old password does not match to the database entry" + + password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8') + query = SQLQuery.get_update_user_password() + affected_rows = execute_sql_query_standalone(query=query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}, command_type="execute") + assert affected_rows == 1 + + # 2.) Revert. Set password back to the default (from 'Start4321' to 'Start1234') + schemaModel = get_user_simple().__dict__ + schemaModel["id"] = 29 + schemaModel["old_password"] = "Start4321" + schemaModel["new_password"] = "Start1234" + + sentinel = object() + query = SQLQuery.get_user_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User) + pooledConnection.close() + assert not theuser is sentinel, f"failed GET user query" + + assert bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")), f"old password does not match to the database entry" + + password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8') + query = SQLQuery.get_update_user_password() + affected_rows = execute_sql_query_standalone(query=query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}, command_type="execute") + assert affected_rows == 1 + return + +def test_sql_get_participants_of_shipcall_id(): + shipcall_id = 389 + query = SQLQuery.get_participants() + participants = execute_sql_query_standalone(query=query, model=dict, param={"shipcall_id" : shipcall_id}) + assert all([part.get("participant_id") is not None for part in participants]) + assert all([part.get("type") is not None for part in participants]) + + # try to convert every participant into a model.Participant_Assignment + participants = [ + model.Participant_Assignment(part["participant_id"], part["type"]) + for part in participants + ] + assert all([isinstance(part,model.Participant_Assignment) for part in participants]) + return + +def test_sql_get_all_shipcalls_and_assign_participants(): + """ + this test reproduces the SQL query within BreCal.impl.shipcalls to make sure, that the + query first returns all shipcalls, and then assigns all participants of the respective shipcall to it. + """ + # get all shipcalls + options = {'past_days':30000} + query = SQLQuery.get_shipcalls(options) + shipcalls = execute_sql_query_standalone(query=query, model=model.Shipcall.from_query_row) + assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls]) + + # for every shipcall, assign all of its participants to it + for shipcall in shipcalls: + participant_query = SQLQuery.get_participants() + participants = execute_sql_query_standalone(query=participant_query, model=dict, param={"shipcall_id" : shipcall.id}) + + for record in participants: + pa = model.Participant_Assignment(record["participant_id"], record["type"]) + shipcall.participants.append(pa) + + assert any([ + any([isinstance(participant, model.Participant_Assignment) for participant in shipcall.participants]) + for shipcall in shipcalls + ]), f"at least one of the shipcalls should have an assigned model.Participant_Assignment" + return + +#schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})