diff --git a/src/server/BreCal/database/sql_handler.py b/src/server/BreCal/database/sql_handler.py index 9b1e086..2d85175 100644 --- a/src/server/BreCal/database/sql_handler.py +++ b/src/server/BreCal/database/sql_handler.py @@ -79,6 +79,15 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N elif command_type=="execute": schemas = commands.execute(query, param=param) + elif command_type=="single": + sentinel = object() + + # pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary. + # when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model. + schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model) + if schemas is sentinel: + raise Exception("no such record") + else: raise ValueError(command_type) diff --git a/src/server/BreCal/database/sql_queries.py b/src/server/BreCal/database/sql_queries.py index e69ea87..51a3dea 100644 --- a/src/server/BreCal/database/sql_queries.py +++ b/src/server/BreCal/database/sql_queries.py @@ -25,25 +25,6 @@ def create_sql_query_shipcall_get(options:dict)->str: "(etd >= DATE(NOW() - INTERVAL %d DAY)))) " + "ORDER BY eta") % (options["past_days"], options["past_days"], options["past_days"], options["past_days"]) - """ - alternatively, f-strings could be used. - query_two = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + - "flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + - "tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + - "evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + - "FROM shipcall s " + - "LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + - "WHERE " + - "(type = 1 AND " + - f"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL {options['past_days']} DAY)) OR " + - f"(eta >= DATE(NOW() - INTERVAL {options['past_days']} DAY)))) OR " + - "((type = 2 OR type = 3) AND " + - f"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL {options['past_days']} DAY)) OR " + - f"(etd >= DATE(NOW() - INTERVAL {options['past_days']} DAY)))) " + - "ORDER BY eta") - - assert query==query_two - """ return query @@ -100,7 +81,7 @@ def create_sql_query_shipcall_post(schemaModel:dict)->str: isNotFirst = True query += "?" + param_key + "?" query += ")" - return + return query def create_sql_query_shipcall_put(schemaModel:dict)->str: query = "UPDATE shipcall SET " @@ -162,6 +143,53 @@ def create_sql_query_user_put(schemaModel:dict): query += "WHERE id = ?id?" return query +def create_sql_query_ship_post(schemaModel:dict): + query = "INSERT INTO ship (" + isNotFirst = False + for key in schemaModel.keys(): + if key == "id": + continue + if key == "created": + continue + if key == "modified": + continue + if isNotFirst: + query += "," + isNotFirst = True + query += key + query += ") VALUES (" + isNotFirst = False + for key in schemaModel.keys(): + if key == "id": + continue + if key == "created": + continue + if key == "modified": + continue + if isNotFirst: + query += "," + isNotFirst = True + query += "?" + key + "?" + query += ")" + return query + +def create_sql_query_ship_put(schemaModel:dict): + query = "UPDATE ship SET " + isNotFirst = False + for key in schemaModel.keys(): + if key == "id": + continue + if key == "created": + continue + if key == "modified": + continue + if isNotFirst: + query += ", " + isNotFirst = True + query += key + " = ?" + key + "? " + + query += "WHERE id = ?id?" + return query @@ -213,8 +241,24 @@ class SQLQuery(): @staticmethod def get_shipcalls(options:dict={'past_days':3})->str: + # a pytest proves this method to be identical to create_sql_query_shipcall_get(options) assert 'past_days' in list(options.keys()), f"there must be a key 'past_days' in the options, which determines, how recent the returned list of shipcalls shall be." # part of a pytest.raises - query = create_sql_query_shipcall_get(options) + past_days = options['past_days'] + + query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + \ + "flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + \ + "tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + \ + "evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + \ + "FROM shipcall s " + \ + "LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + \ + "WHERE " + \ + "(type = 1 AND " + \ + f"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \ + f"(eta >= DATE(NOW() - INTERVAL {past_days} DAY)))) OR " + \ + "((type = 2 OR type = 3) AND " + \ + f"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \ + f"(etd >= DATE(NOW() - INTERVAL {past_days} DAY)))) " + \ + "ORDER BY eta") return query @staticmethod @@ -254,4 +298,121 @@ class SQLQuery(): def get_participants()->str: query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?" return query - \ No newline at end of file + + @staticmethod + def get_shipcall_post(schemaModel:dict)->str: + # a pytest proves this method to be identical to create_sql_query_shipcall_post(schemaModel) + + param_keys = {key:key for key in schemaModel.keys()} + param_keys["type"] = "type_value" + param_keys["evaluation"] = "evaluation_value" + + prefix = "INSERT INTO shipcall (" + bridge = ") VALUES (" + suffix = ")" + + stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]]) + stage2 = ",".join([f"?{param_keys.get(key)}?" for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]]) + + query = prefix+stage1+bridge+stage2+suffix + return query + + @staticmethod + def get_last_insert_id()->str: + query = "select last_insert_id()" + return query + + @staticmethod + def get_shipcall_post_last_insert_id()->str: + """alias function. May be deleted soon""" + query = SQLQuery.get_last_insert_id() + return query + + @staticmethod + def get_shipcall_post_update_shipcall_participant_map()->str: + query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" + return query + + @staticmethod + def create_sql_query_history_post()->str: + query = create_sql_query_history_post() + return query + + @staticmethod + def get_shipcall_by_id()->str: + query = "SELECT * FROM shipcall where id = ?id?" + return query + + @staticmethod + def get_shipcall_put(schemaModel:dict)->str: + # a pytest proves this method to be identical to create_sql_query_shipcall_put(schemaModel) + param_keys = {key:key for key in schemaModel.keys()} + param_keys["type"] = "type_value" + param_keys["evaluation"] = "evaluation_value" + + prefix = "UPDATE shipcall SET " + suffix = "WHERE id = ?id?" + body = ", ".join([f"{key} = ?{param_keys.get(key)}? " for key in schemaModel.keys() if key not in ["id", "participants", "created", "modified", "evaluation", "evaluation_message", "type_value", "evaluation_value"]]) + + query = prefix + body + suffix + return query + + @staticmethod + def get_shipcall_participant_map_by_shipcall_id()->str: + query = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?" + return query + + @staticmethod + def get_shipcall_participant_map_delete_by_id()->str: + query = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?" + return query + + @staticmethod + def create_sql_query_history_put()->str: + query = create_sql_query_history_put() + return query + + @staticmethod + def get_ship_post(schemaModel:dict)->str: + # a pytest proves this method to be identical to create_sql_query_ship_post(schemaModel) + prefix = "INSERT INTO ship (" + suffix = ")" + bridge = ") VALUES (" + + stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id", "created", "modified"]]) + stage2 = ",".join([f"?{key}?" for key in schemaModel.keys() if not key in ["id", "created", "modified"]]) + + query = prefix + stage1 + bridge + stage2 + suffix + return query + + @staticmethod + def get_ship_put(schemaModel:dict)->str: + # a pytest proves this method to be identical to create_sql_query_ship_put(schemaModel) + prefix = "UPDATE ship SET " + suffix = "WHERE id = ?id?" + body = ", ".join([f"{key} = ?{key}? " for key in schemaModel.keys() if not key in ["id","created","modified"]]) + + query = prefix + body + suffix + return query + + @staticmethod + def get_ship_delete_by_id()->str: + query = "UPDATE ship SET deleted = 1 WHERE id = ?id?" + return query + + @staticmethod + def get_notification_post()->str: + raise NotImplementedError() + # #TODO: this query is wrong and just a proxy for a POST request + query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" + return query + + @staticmethod + def get_shipcall_put_notification_state()->str: + raise NotImplementedError() + # #TODO: use evaluation_notifications_sent here and consider only the shipcall_id + # #TODO: query + query = ... + return query + + diff --git a/src/server/BreCal/database/update_database.py b/src/server/BreCal/database/update_database.py index 4c120f3..395f725 100644 --- a/src/server/BreCal/database/update_database.py +++ b/src/server/BreCal/database/update_database.py @@ -34,6 +34,7 @@ def update_shipcall_in_mysql_database(sql_connection, shipcall:Shipcall, relevan def build_mysql_query_to_update_shipcall(shipcall, relevant_keys:list): """builds a mysql query, which updates the shipcall table. In particular, the provided shipcall will be updated for each key in {relevant_keys}""" + # #TODO: refactor into SQLQuery schemaModel = shipcall.__dict__ # prepare prefix and suffix. Then build the body of the query @@ -55,7 +56,6 @@ def update_all_shipcalls_in_mysql_database(sql_connection, sql_handler:SQLHandle sql_handler: an SQLHandler instance shipcall_df: dataframe, which stores the data that is used to retrieve the shipcall data models (that are then updated in the database) """ - print(shipcall_df) for shipcall_id in shipcall_df.index: shipcall = sql_handler.df_loc_to_data_model(df=shipcall_df, id=shipcall_id, model_str="shipcall") update_shipcall_in_mysql_database(sql_connection, shipcall=shipcall, relevant_keys = ["evaluation", "evaluation_message"]) diff --git a/src/server/BreCal/impl/shipcalls.py b/src/server/BreCal/impl/shipcalls.py index bb3a9e4..83385cf 100644 --- a/src/server/BreCal/impl/shipcalls.py +++ b/src/server/BreCal/impl/shipcalls.py @@ -73,7 +73,7 @@ def PostShipcalls(schemaModel): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) - # query = create_sql_query_shipcall_post(schemaModel) + # query = SQLQuery.get_shipcall_post(schemaModel) # create_sql_query_shipcall_post(schemaModel) query = "INSERT INTO shipcall (" isNotFirst = False for key in schemaModel.keys(): @@ -126,22 +126,15 @@ def PostShipcalls(schemaModel): isNotFirst = True query += "?" + param_key + "?" query += ")" - - # #TODO: enter completeness validation here. Only shipcalls, where all required fields are filled in are valid - # "Pflichtfelder: z.B. die Felder bei der Neuanlage eines shipcalls müssen entsprechend des Anlauf-Typs belegt sein, damit gespeichert werden kann" - Daniel Schick - - # #TODO: enter role validation here. Only users of correct type should be allowed to post a shipcall - - # #TODO: enter reference validation here - # full_id_existances = check_id_existances(mysql_connector_instance=sql_connection, schemaModel=schemaModel, debug=False) - # if not full_id_existances: - # pooledConnection.close() - # return json.dumps({"message" : "call failed. missing mandatory keywords."}), 500, {'Content-Type': 'application/json; charset=utf-8'} commands.execute(query, schemaModel) + + # lquery = SQLQuery.get_shipcall_post_last_insert_id() + # new_id = commands.execute_scalar(lquery) new_id = commands.execute_scalar("select last_insert_id()") # add participant assignments if we have a list of participants if 'participants' in schemaModel: + # pquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() pquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" for participant_assignment in schemaModel["participants"]: commands.execute(pquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}) @@ -152,7 +145,7 @@ def PostShipcalls(schemaModel): # save history data # TODO: set ETA properly user_data = check_jwt() - # query = create_sql_query_history_post() + # query = SQLQuery.create_sql_query_history_post() query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 1)" commands.execute(query, {"scid" : new_id, "pid" : user_data["participant_id"], "uid" : user_data["id"]}) @@ -193,12 +186,14 @@ def PutShipcalls(schemaModel): # test if object to update is found sentinel = object() + # query = SQLQuery.get_shipcall_by_id() + # theshipcall = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}) theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]}) if theshipcall is sentinel: pooledConnection.close() return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'} - # query = create_sql_query_shipcall_put(schemaModel) + # query = SQLQuery.get_shipcall_put(schemaModel) query = "UPDATE shipcall SET " isNotFirst = False for key in schemaModel.keys(): @@ -230,18 +225,9 @@ def PutShipcalls(schemaModel): query += "WHERE id = ?id?" - # #TODO: enter role validation here. - - # #TODO: enter completeness validation here. Only shipcalls, where all required fields are filled in are valid - - - # #TODO: enter reference validation here - # full_id_existances = check_id_existances(mysql_connector_instance=sql_connection, schemaModel=schemaModel, debug=False) - # if not full_id_existances: - # return ... (500) - affected_rows = commands.execute(query, param=schemaModel) + # pquery = SQLQuery.get_shipcall_participant_map_by_shipcall_id() pquery = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?" pdata = commands.query(pquery,param={"id" : schemaModel["id"]}) # existing list of assignments @@ -254,6 +240,7 @@ def PutShipcalls(schemaModel): found_participant = True break if not found_participant: + # nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() nquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}) @@ -265,15 +252,13 @@ def PutShipcalls(schemaModel): found_participant = True break; if not found_participant: + # dquery = SQLQuery.get_shipcall_participant_map_delete_by_id() dquery = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?" commands.execute(dquery, param={"existing_id" : elem["id"]}) - # apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database - # evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=schemaModel["id"]) # schemaModel["id"] refers to the shipcall id - # save history data # TODO: set ETA properly - # query = create_sql_query_history_put() + # query = SQLQuery.create_sql_query_history_put() query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)" commands.execute(query, {"scid" : schemaModel["id"], "pid" : user_data["participant_id"], "uid" : user_data["id"]}) diff --git a/src/server/BreCal/impl/ships.py b/src/server/BreCal/impl/ships.py index d557fd8..0733071 100644 --- a/src/server/BreCal/impl/ships.py +++ b/src/server/BreCal/impl/ships.py @@ -49,6 +49,7 @@ def PostShip(schemaModel): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.create_sql_query_ship_post(schemaModel) query = "INSERT INTO ship (" isNotFirst = False for key in schemaModel.keys(): @@ -78,6 +79,8 @@ def PostShip(schemaModel): query += ")" commands.execute(query, schemaModel) + # nquery = SQLQuery.get_last_insert_id() + # new_id = commands.execute_scalar(nquery) new_id = commands.execute_scalar("select last_insert_id()") pooledConnection.close() @@ -103,6 +106,7 @@ def PutShip(schemaModel): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.create_sql_query_ship_put(schemaModel) query = "UPDATE ship SET " isNotFirst = False for key in schemaModel.keys(): @@ -143,6 +147,8 @@ def DeleteShip(options): pooledConnection = local_db.getPoolConnection() commands = pydapper.using(pooledConnection) + # query = SQLQuery.get_ship_delete_by_id() + # affected_rows = commands.execute(query, param={"id" : options["id"]}) affected_rows = commands.execute("UPDATE ship SET deleted = 1 WHERE id = ?id?", param={"id" : options["id"]}) pooledConnection.close() diff --git a/src/server/BreCal/schemas/model.py b/src/server/BreCal/schemas/model.py index d6683a7..92232e2 100644 --- a/src/server/BreCal/schemas/model.py +++ b/src/server/BreCal/schemas/model.py @@ -54,10 +54,19 @@ class EvaluationType(IntEnum): def _missing_(cls, value): return cls.undefined -class NotificationType(IntEnum): +class NotificationType(IntEnum): + """ + Any user has the attributes + 'notify_email' -> NotificationType.email + 'notify_popup' -> NotificationType.push + 'notify_whatsapp' -> undeclared + 'notify_signal' -> undeclared + """ undefined = 0 email = 1 push = 2 + # whatsapp = 3 + # signal = 4 @classmethod def _missing_(cls, value): @@ -118,11 +127,17 @@ class GetVerifyInlineResp(Schema): @dataclass class Notification: + """ + Base data class for any notification. + + Description: + 'An entry corresponds to an alarm given by a violated rule during times update' + """ id: int - shipcall_id: int - level: int - type: NotificationType - message: str + shipcall_id: int # 'shipcall record that caused the notification' + level: int # 'severity of the notification' + type: NotificationType # 'type of the notification' + message: str # 'individual message' created: datetime modified: datetime @@ -436,6 +451,7 @@ class UserSchema(Schema): user_email = fields.String(allow_none=True, metadata={'Required':False}, validate=[validate.Length(max=64)]) old_password = fields.String(allow_none=True, metadata={'Required':False}, validate=[validate.Length(max=128)]) new_password = fields.String(allow_none=True, metadata={'Required':False}, validate=[validate.Length(min=6, max=128)]) + # #TODO: the user schema does not (yet) include the 'notify_' fields @validates("user_phone") def validate_user_phone(self, value): @@ -488,10 +504,10 @@ class User: user_phone: str password_hash: str api_key: str - notify_email: bool - notify_whatsapp: bool - notify_signal: bool - notify_popup: bool + notify_email: bool # #TODO_clarify: should we use an IntFlag for multi-assignment? + notify_whatsapp: bool # #TODO_clarify: should we use an IntFlag for multi-assignment? + notify_signal: bool # #TODO_clarify: should we use an IntFlag for multi-assignment? + notify_popup: bool # #TODO_clarify: should we use an IntFlag for multi-assignment? created: datetime modified: datetime diff --git a/src/server/BreCal/stubs/shipcall.py b/src/server/BreCal/stubs/shipcall.py index 1400112..299fdbe 100644 --- a/src/server/BreCal/stubs/shipcall.py +++ b/src/server/BreCal/stubs/shipcall.py @@ -201,3 +201,7 @@ def get_stub_shipcall_arrival_invalid_missing_type(): post_data.pop("type", None) return post_data +def get_stub_valid_ship_loaded_model(post_data): + from BreCal.schemas import model + loadedModel = model.ShipcallSchema().load(data=post_data, many=False, partial=True) + return loadedModel diff --git a/src/server/tests/database/test_sql_queries.py b/src/server/tests/database/test_sql_queries.py index 97934f6..f490eed 100644 --- a/src/server/tests/database/test_sql_queries.py +++ b/src/server/tests/database/test_sql_queries.py @@ -284,4 +284,244 @@ def test_sql_get_all_shipcalls_and_assign_participants(): ]), f"at least one of the shipcalls should have an assigned model.Participant_Assignment" return +def test_sqlquery_get_shipcal_post_identical_to_create_sql_query_shipcall_post(): + from BreCal.database.sql_queries import create_sql_query_shipcall_post + from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model + + post_data = get_stub_valid_shipcall_shifting() + schemaModel = get_stub_valid_ship_loaded_model(post_data) + query1 = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise) + query2 = create_sql_query_shipcall_post(schemaModel) + + assert query1==query2 + return + +def test_sql_post_shipcall(): + """issues a post-request with stub data and adds it to the database.""" + from BreCal.database.sql_queries import create_sql_query_shipcall_post + from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model + + pooledConnection = local_db.getPoolConnection() + try: + commands = pydapper.using(pooledConnection) + + post_data = get_stub_valid_shipcall_shifting() + post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters). + schemaModel = get_stub_valid_ship_loaded_model(post_data) + query = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise) + schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute", pooledConnection=pooledConnection) + assert schemas==1, f"unsuccessful query execution. Query: {query}" + + # within the same pooledConnection, ask for the last inserted id + query = SQLQuery.get_shipcall_post_last_insert_id() + new_id = commands.execute_scalar(query) + assert new_id > 0, f"the new id should be unlike 0.." + + # add participant assignments if we have a list of participants + if 'participants' in schemaModel: + pquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() + for participant_assignment in schemaModel["participants"]: + schemas = execute_sql_query_standalone(query=pquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute", pooledConnection=pooledConnection) + + + from BreCal.stubs.user import get_user_simple + # assign an artificial user with id 29 (maxm) & participant type 136 + user_data = get_user_simple().__dict__ + user_data["id"] = 29 + user_data["participant_id"] = 136 + + # POST in the history + query = SQLQuery.create_sql_query_history_post() + schemas = execute_sql_query_standalone(query=query, param={"scid" : new_id, "pid" : user_data["participant_id"], "uid" : user_data["id"]}, command_type="execute", pooledConnection=pooledConnection) + assert schemas == 1, f"unsuccessful history POST" + + + finally: + pooledConnection.close() + return + +def test_sql_create_history_post_matches_legacy_function(): + from BreCal.database.sql_queries import create_sql_query_history_post + + query_refactored = SQLQuery.create_sql_query_history_post() + query_legacy = create_sql_query_history_post() + assert isinstance(query_refactored,str) + assert query_refactored==query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version" + return + +def test_sql_get_shipcall_by_id(): + schemaModel = {"id":63} + + sentinel = object() + query = SQLQuery.get_shipcall_by_id() + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + theshipcall = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.Shipcall) + pooledConnection.close() + assert not theshipcall is sentinel, f"failed GET user query" + assert theshipcall.id==schemaModel["id"] + return + +def test_sql_get_shipcall_by_id_short_version(): + schemaModel = {"id":63} + + # when model is defined, returns the data model + query = SQLQuery.get_shipcall_by_id() + schemas = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, model=model.Shipcall, command_type="single") + + assert schemas.id==schemaModel["id"] + assert isinstance(schemas, model.Shipcall) + + # when model = None, returns a dictionary + query = SQLQuery.get_shipcall_by_id() + schemas = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="single") + assert isinstance(schemas, dict) + assert schemas.get("id")==schemaModel["id"] + return + +def test_sql_get_shipcall_put_refactored_equals_extended_version(): + from BreCal.database.sql_queries import create_sql_query_shipcall_put + from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model + + post_data = get_stub_valid_shipcall_shifting() + post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters). + schemaModel = get_stub_valid_ship_loaded_model(post_data) + + legacy_query = create_sql_query_shipcall_put(schemaModel) + refactored_query = SQLQuery.get_shipcall_put(schemaModel) + + assert refactored_query == legacy_query, f"version conflict. the refactored query must precisely match the legacy query!" + return + +def test_sql_get_shipcall_participant_map_by_shipcall_id(): + schemaModel = {"id":152} + + query = SQLQuery.get_shipcall_participant_map_by_shipcall_id() + pdata = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="query") # existing list of assignments + + assert len(pdata)==4, f"there should be four assigned participants for the shipcall with id {schemaModel.get('id')}" + return + +def test_sql__get_shipcall__get_spm__optionally_update_shipcall(): + schemaModel = {'id': 152} + query = SQLQuery.get_shipcall_by_id() + shipcall = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="single", model=model.Shipcall) + + query = SQLQuery.get_shipcall_participant_map_by_shipcall_id() + pdata = execute_sql_query_standalone(query=query, param={"id" : shipcall.id}, command_type="query") # existing list of assignments + + assert len(pdata)==4, f"there should be four assigned participants for the shipcall with id {shipcall.id}" + + for participant_assignment in shipcall.participants: + found_participant = False + for elem in pdata: + if elem["participant_id"] == participant_assignment["participant_id"] and elem["type"] == participant_assignment["type"]: + found_participant = True + break + if not found_participant: + nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() + ndata = execute_sql_query_standalone(query=nquery, param={"shipcall_id" : shipcall.id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute") # existing list of assignments + return + +def test_sql__shipcall_post__get_last_insert_id__get_spm__update_participants__verify_changes(): + """ + this combinatorial test: + 1.) creates a novel shipcall + 2.) obtains the ID of the just-created shipcall + 3.) reads the participant map for that ID and verifies, that there are no participants listed + 4.) iteratively updates the participant map of the ID (using proxy data) + 5.) verifies the update + """ + from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model + pooledConnection = local_db.getPoolConnection() + commands = pydapper.using(pooledConnection) + try: + + # 1.) create shipcall + post_data = get_stub_valid_shipcall_shifting() + post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters). + schemaModel = get_stub_valid_ship_loaded_model(post_data) + query = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise) + schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute", pooledConnection=pooledConnection) + assert schemas==1, f"unsuccessful query execution. Query: {query}" + + # 2.) obtain the ID of the novel shipcall + # within the same pooledConnection, ask for the last inserted id + query = SQLQuery.get_shipcall_post_last_insert_id() + new_id = commands.execute_scalar(query) + assert new_id > 0, f"the new id should be unlike 0.." + + # 3.) read the ShipcallParticipantMap for the novel id + query = SQLQuery.get_shipcall_participant_map_by_shipcall_id() + pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments + assert len(pdata)==0, f"as the POST query does not include participants in this case, the novel id should not have assigned participants." + + ### proxy data ### + # loop across passed participant ids, creating entries for those not present in pdata + schemaModel = {'id': new_id, "participants":[{'id': 128, 'participant_id': 2, 'type': 4}, {'id': 129, 'participant_id': 3, 'type': 1}, {'id': 130, 'participant_id': 4, 'type': 2}, {'id': 131, 'participant_id': 6, 'type': 8}]} + + # 4.) assign the participants + for participant_assignment in schemaModel["participants"]: + found_participant = False + for elem in pdata: + if elem["participant_id"] == participant_assignment["participant_id"] and elem["type"] == participant_assignment["type"]: + found_participant = True + break + if not found_participant: + nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map() + ndata = execute_sql_query_standalone(query=nquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute") # existing list of assignments + + # 5.) verify the update (5 participants, including the false one) + query = SQLQuery.get_shipcall_participant_map_by_shipcall_id() + pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments + assert len(pdata)==5, f"due to the PUT, there shall now be five participants, as defined in schemaModel." + + # 6.) delete the incorrect participant (last entry in the list in this case) + dquery = SQLQuery.get_shipcall_participant_map_delete_by_id() + ddata = execute_sql_query_standalone(query=dquery, param={"existing_id" : pdata[-1].get("id")}, command_type="execute") + + # 7.) verify the update (now 4 participants) + query = SQLQuery.get_shipcall_participant_map_by_shipcall_id() + pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments + assert len(pdata)==4, f"due to the PUT, there shall now be five participants, as defined in schemaModel." + + finally: + pooledConnection.close() + return + +def test_sql_query_get_shipcalls_is_identical_to_legacy_query(): + from BreCal.database.sql_queries import create_sql_query_shipcall_get + + options = {'past_days':7} + query_refactored = SQLQuery.get_shipcalls(options) + query_legacy = create_sql_query_shipcall_get(options) + + assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version" + return + +def test_sql_query_post_ship_is_identical_to_legacy_query(): + from BreCal.database.sql_queries import SQLQuery, create_sql_query_ship_post, create_sql_query_ship_put + from BreCal.stubs.ship import get_stub_valid_ship_loaded_model + + schemaModel = get_stub_valid_ship_loaded_model() + + query_refactored = SQLQuery.get_ship_post(schemaModel) + query_legacy = create_sql_query_ship_post(schemaModel) + + assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version" + return + +def test_sql_query_put_ship_is_identical_to_legacy_query(): + from BreCal.database.sql_queries import SQLQuery, create_sql_query_ship_post, create_sql_query_ship_put + from BreCal.stubs.ship import get_stub_valid_ship_loaded_model + + schemaModel = get_stub_valid_ship_loaded_model() + query_refactored = SQLQuery.get_ship_put(schemaModel) + query_legacy = create_sql_query_ship_put(schemaModel) + + assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version" + return + + + #schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})