refactoring SQL queries into a joint object. Creating stubs and unit tests to ensure proper functionality.

This commit is contained in:
Max Metz 2024-05-29 11:22:03 +02:00
parent f57b317a27
commit b4c105946a
8 changed files with 481 additions and 60 deletions

View File

@ -79,6 +79,15 @@ def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=N
elif command_type=="execute":
schemas = commands.execute(query, param=param)
elif command_type=="single":
sentinel = object()
# pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary.
# when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model.
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
if schemas is sentinel:
raise Exception("no such record")
else:
raise ValueError(command_type)

View File

@ -25,25 +25,6 @@ def create_sql_query_shipcall_get(options:dict)->str:
"(etd >= DATE(NOW() - INTERVAL %d DAY)))) " +
"ORDER BY eta") % (options["past_days"], options["past_days"], options["past_days"], options["past_days"])
"""
alternatively, f-strings could be used.
query_two = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " +
"flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " +
"tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " +
"evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " +
"FROM shipcall s " +
"LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " +
"WHERE " +
"(type = 1 AND " +
f"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL {options['past_days']} DAY)) OR " +
f"(eta >= DATE(NOW() - INTERVAL {options['past_days']} DAY)))) OR " +
"((type = 2 OR type = 3) AND " +
f"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL {options['past_days']} DAY)) OR " +
f"(etd >= DATE(NOW() - INTERVAL {options['past_days']} DAY)))) " +
"ORDER BY eta")
assert query==query_two
"""
return query
@ -100,7 +81,7 @@ def create_sql_query_shipcall_post(schemaModel:dict)->str:
isNotFirst = True
query += "?" + param_key + "?"
query += ")"
return
return query
def create_sql_query_shipcall_put(schemaModel:dict)->str:
query = "UPDATE shipcall SET "
@ -162,6 +143,53 @@ def create_sql_query_user_put(schemaModel:dict):
query += "WHERE id = ?id?"
return query
def create_sql_query_ship_post(schemaModel:dict):
query = "INSERT INTO ship ("
isNotFirst = False
for key in schemaModel.keys():
if key == "id":
continue
if key == "created":
continue
if key == "modified":
continue
if isNotFirst:
query += ","
isNotFirst = True
query += key
query += ") VALUES ("
isNotFirst = False
for key in schemaModel.keys():
if key == "id":
continue
if key == "created":
continue
if key == "modified":
continue
if isNotFirst:
query += ","
isNotFirst = True
query += "?" + key + "?"
query += ")"
return query
def create_sql_query_ship_put(schemaModel:dict):
query = "UPDATE ship SET "
isNotFirst = False
for key in schemaModel.keys():
if key == "id":
continue
if key == "created":
continue
if key == "modified":
continue
if isNotFirst:
query += ", "
isNotFirst = True
query += key + " = ?" + key + "? "
query += "WHERE id = ?id?"
return query
@ -213,8 +241,24 @@ class SQLQuery():
@staticmethod
def get_shipcalls(options:dict={'past_days':3})->str:
# a pytest proves this method to be identical to create_sql_query_shipcall_get(options)
assert 'past_days' in list(options.keys()), f"there must be a key 'past_days' in the options, which determines, how recent the returned list of shipcalls shall be." # part of a pytest.raises
query = create_sql_query_shipcall_get(options)
past_days = options['past_days']
query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + \
"flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + \
"tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + \
"evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + \
"FROM shipcall s " + \
"LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + \
"WHERE " + \
"(type = 1 AND " + \
f"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \
f"(eta >= DATE(NOW() - INTERVAL {past_days} DAY)))) OR " + \
"((type = 2 OR type = 3) AND " + \
f"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \
f"(etd >= DATE(NOW() - INTERVAL {past_days} DAY)))) " + \
"ORDER BY eta")
return query
@staticmethod
@ -254,4 +298,121 @@ class SQLQuery():
def get_participants()->str:
query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?"
return query
@staticmethod
def get_shipcall_post(schemaModel:dict)->str:
# a pytest proves this method to be identical to create_sql_query_shipcall_post(schemaModel)
param_keys = {key:key for key in schemaModel.keys()}
param_keys["type"] = "type_value"
param_keys["evaluation"] = "evaluation_value"
prefix = "INSERT INTO shipcall ("
bridge = ") VALUES ("
suffix = ")"
stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]])
stage2 = ",".join([f"?{param_keys.get(key)}?" for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]])
query = prefix+stage1+bridge+stage2+suffix
return query
@staticmethod
def get_last_insert_id()->str:
query = "select last_insert_id()"
return query
@staticmethod
def get_shipcall_post_last_insert_id()->str:
"""alias function. May be deleted soon"""
query = SQLQuery.get_last_insert_id()
return query
@staticmethod
def get_shipcall_post_update_shipcall_participant_map()->str:
query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
return query
@staticmethod
def create_sql_query_history_post()->str:
query = create_sql_query_history_post()
return query
@staticmethod
def get_shipcall_by_id()->str:
query = "SELECT * FROM shipcall where id = ?id?"
return query
@staticmethod
def get_shipcall_put(schemaModel:dict)->str:
# a pytest proves this method to be identical to create_sql_query_shipcall_put(schemaModel)
param_keys = {key:key for key in schemaModel.keys()}
param_keys["type"] = "type_value"
param_keys["evaluation"] = "evaluation_value"
prefix = "UPDATE shipcall SET "
suffix = "WHERE id = ?id?"
body = ", ".join([f"{key} = ?{param_keys.get(key)}? " for key in schemaModel.keys() if key not in ["id", "participants", "created", "modified", "evaluation", "evaluation_message", "type_value", "evaluation_value"]])
query = prefix + body + suffix
return query
@staticmethod
def get_shipcall_participant_map_by_shipcall_id()->str:
query = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?"
return query
@staticmethod
def get_shipcall_participant_map_delete_by_id()->str:
query = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?"
return query
@staticmethod
def create_sql_query_history_put()->str:
query = create_sql_query_history_put()
return query
@staticmethod
def get_ship_post(schemaModel:dict)->str:
# a pytest proves this method to be identical to create_sql_query_ship_post(schemaModel)
prefix = "INSERT INTO ship ("
suffix = ")"
bridge = ") VALUES ("
stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id", "created", "modified"]])
stage2 = ",".join([f"?{key}?" for key in schemaModel.keys() if not key in ["id", "created", "modified"]])
query = prefix + stage1 + bridge + stage2 + suffix
return query
@staticmethod
def get_ship_put(schemaModel:dict)->str:
# a pytest proves this method to be identical to create_sql_query_ship_put(schemaModel)
prefix = "UPDATE ship SET "
suffix = "WHERE id = ?id?"
body = ", ".join([f"{key} = ?{key}? " for key in schemaModel.keys() if not key in ["id","created","modified"]])
query = prefix + body + suffix
return query
@staticmethod
def get_ship_delete_by_id()->str:
query = "UPDATE ship SET deleted = 1 WHERE id = ?id?"
return query
@staticmethod
def get_notification_post()->str:
raise NotImplementedError()
# #TODO: this query is wrong and just a proxy for a POST request
query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
return query
@staticmethod
def get_shipcall_put_notification_state()->str:
raise NotImplementedError()
# #TODO: use evaluation_notifications_sent here and consider only the shipcall_id
# #TODO: query
query = ...
return query

View File

@ -34,6 +34,7 @@ def update_shipcall_in_mysql_database(sql_connection, shipcall:Shipcall, relevan
def build_mysql_query_to_update_shipcall(shipcall, relevant_keys:list):
"""builds a mysql query, which updates the shipcall table. In particular, the provided shipcall will be updated for each key in {relevant_keys}"""
# #TODO: refactor into SQLQuery
schemaModel = shipcall.__dict__
# prepare prefix and suffix. Then build the body of the query
@ -55,7 +56,6 @@ def update_all_shipcalls_in_mysql_database(sql_connection, sql_handler:SQLHandle
sql_handler: an SQLHandler instance
shipcall_df: dataframe, which stores the data that is used to retrieve the shipcall data models (that are then updated in the database)
"""
print(shipcall_df)
for shipcall_id in shipcall_df.index:
shipcall = sql_handler.df_loc_to_data_model(df=shipcall_df, id=shipcall_id, model_str="shipcall")
update_shipcall_in_mysql_database(sql_connection, shipcall=shipcall, relevant_keys = ["evaluation", "evaluation_message"])

View File

@ -73,7 +73,7 @@ def PostShipcalls(schemaModel):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = create_sql_query_shipcall_post(schemaModel)
# query = SQLQuery.get_shipcall_post(schemaModel) # create_sql_query_shipcall_post(schemaModel)
query = "INSERT INTO shipcall ("
isNotFirst = False
for key in schemaModel.keys():
@ -126,22 +126,15 @@ def PostShipcalls(schemaModel):
isNotFirst = True
query += "?" + param_key + "?"
query += ")"
# #TODO: enter completeness validation here. Only shipcalls, where all required fields are filled in are valid
# "Pflichtfelder: z.B. die Felder bei der Neuanlage eines shipcalls müssen entsprechend des Anlauf-Typs belegt sein, damit gespeichert werden kann" - Daniel Schick
# #TODO: enter role validation here. Only users of correct type should be allowed to post a shipcall
# #TODO: enter reference validation here
# full_id_existances = check_id_existances(mysql_connector_instance=sql_connection, schemaModel=schemaModel, debug=False)
# if not full_id_existances:
# pooledConnection.close()
# return json.dumps({"message" : "call failed. missing mandatory keywords."}), 500, {'Content-Type': 'application/json; charset=utf-8'}
commands.execute(query, schemaModel)
# lquery = SQLQuery.get_shipcall_post_last_insert_id()
# new_id = commands.execute_scalar(lquery)
new_id = commands.execute_scalar("select last_insert_id()")
# add participant assignments if we have a list of participants
if 'participants' in schemaModel:
# pquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
pquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
for participant_assignment in schemaModel["participants"]:
commands.execute(pquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]})
@ -152,7 +145,7 @@ def PostShipcalls(schemaModel):
# save history data
# TODO: set ETA properly
user_data = check_jwt()
# query = create_sql_query_history_post()
# query = SQLQuery.create_sql_query_history_post()
query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 1)"
commands.execute(query, {"scid" : new_id, "pid" : user_data["participant_id"], "uid" : user_data["id"]})
@ -193,12 +186,14 @@ def PutShipcalls(schemaModel):
# test if object to update is found
sentinel = object()
# query = SQLQuery.get_shipcall_by_id()
# theshipcall = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]})
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel:
pooledConnection.close()
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
# query = create_sql_query_shipcall_put(schemaModel)
# query = SQLQuery.get_shipcall_put(schemaModel)
query = "UPDATE shipcall SET "
isNotFirst = False
for key in schemaModel.keys():
@ -230,18 +225,9 @@ def PutShipcalls(schemaModel):
query += "WHERE id = ?id?"
# #TODO: enter role validation here.
# #TODO: enter completeness validation here. Only shipcalls, where all required fields are filled in are valid
# #TODO: enter reference validation here
# full_id_existances = check_id_existances(mysql_connector_instance=sql_connection, schemaModel=schemaModel, debug=False)
# if not full_id_existances:
# return ... (500)
affected_rows = commands.execute(query, param=schemaModel)
# pquery = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pquery = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?"
pdata = commands.query(pquery,param={"id" : schemaModel["id"]}) # existing list of assignments
@ -254,6 +240,7 @@ def PutShipcalls(schemaModel):
found_participant = True
break
if not found_participant:
# nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
nquery = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
commands.execute(nquery, param={"shipcall_id" : schemaModel["id"], "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]})
@ -265,15 +252,13 @@ def PutShipcalls(schemaModel):
found_participant = True
break;
if not found_participant:
# dquery = SQLQuery.get_shipcall_participant_map_delete_by_id()
dquery = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?"
commands.execute(dquery, param={"existing_id" : elem["id"]})
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database
# evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=schemaModel["id"]) # schemaModel["id"] refers to the shipcall id
# save history data
# TODO: set ETA properly
# query = create_sql_query_history_put()
# query = SQLQuery.create_sql_query_history_put()
query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)"
commands.execute(query, {"scid" : schemaModel["id"], "pid" : user_data["participant_id"], "uid" : user_data["id"]})

View File

@ -49,6 +49,7 @@ def PostShip(schemaModel):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.create_sql_query_ship_post(schemaModel)
query = "INSERT INTO ship ("
isNotFirst = False
for key in schemaModel.keys():
@ -78,6 +79,8 @@ def PostShip(schemaModel):
query += ")"
commands.execute(query, schemaModel)
# nquery = SQLQuery.get_last_insert_id()
# new_id = commands.execute_scalar(nquery)
new_id = commands.execute_scalar("select last_insert_id()")
pooledConnection.close()
@ -103,6 +106,7 @@ def PutShip(schemaModel):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.create_sql_query_ship_put(schemaModel)
query = "UPDATE ship SET "
isNotFirst = False
for key in schemaModel.keys():
@ -143,6 +147,8 @@ def DeleteShip(options):
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
# query = SQLQuery.get_ship_delete_by_id()
# affected_rows = commands.execute(query, param={"id" : options["id"]})
affected_rows = commands.execute("UPDATE ship SET deleted = 1 WHERE id = ?id?", param={"id" : options["id"]})
pooledConnection.close()

View File

@ -54,10 +54,19 @@ class EvaluationType(IntEnum):
def _missing_(cls, value):
return cls.undefined
class NotificationType(IntEnum):
class NotificationType(IntEnum):
"""
Any user has the attributes
'notify_email' -> NotificationType.email
'notify_popup' -> NotificationType.push
'notify_whatsapp' -> undeclared
'notify_signal' -> undeclared
"""
undefined = 0
email = 1
push = 2
# whatsapp = 3
# signal = 4
@classmethod
def _missing_(cls, value):
@ -118,11 +127,17 @@ class GetVerifyInlineResp(Schema):
@dataclass
class Notification:
"""
Base data class for any notification.
Description:
'An entry corresponds to an alarm given by a violated rule during times update'
"""
id: int
shipcall_id: int
level: int
type: NotificationType
message: str
shipcall_id: int # 'shipcall record that caused the notification'
level: int # 'severity of the notification'
type: NotificationType # 'type of the notification'
message: str # 'individual message'
created: datetime
modified: datetime
@ -436,6 +451,7 @@ class UserSchema(Schema):
user_email = fields.String(allow_none=True, metadata={'Required':False}, validate=[validate.Length(max=64)])
old_password = fields.String(allow_none=True, metadata={'Required':False}, validate=[validate.Length(max=128)])
new_password = fields.String(allow_none=True, metadata={'Required':False}, validate=[validate.Length(min=6, max=128)])
# #TODO: the user schema does not (yet) include the 'notify_' fields
@validates("user_phone")
def validate_user_phone(self, value):
@ -488,10 +504,10 @@ class User:
user_phone: str
password_hash: str
api_key: str
notify_email: bool
notify_whatsapp: bool
notify_signal: bool
notify_popup: bool
notify_email: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_whatsapp: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_signal: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
notify_popup: bool # #TODO_clarify: should we use an IntFlag for multi-assignment?
created: datetime
modified: datetime

View File

@ -201,3 +201,7 @@ def get_stub_shipcall_arrival_invalid_missing_type():
post_data.pop("type", None)
return post_data
def get_stub_valid_ship_loaded_model(post_data):
from BreCal.schemas import model
loadedModel = model.ShipcallSchema().load(data=post_data, many=False, partial=True)
return loadedModel

View File

@ -284,4 +284,244 @@ def test_sql_get_all_shipcalls_and_assign_participants():
]), f"at least one of the shipcalls should have an assigned model.Participant_Assignment"
return
def test_sqlquery_get_shipcal_post_identical_to_create_sql_query_shipcall_post():
from BreCal.database.sql_queries import create_sql_query_shipcall_post
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
post_data = get_stub_valid_shipcall_shifting()
schemaModel = get_stub_valid_ship_loaded_model(post_data)
query1 = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise)
query2 = create_sql_query_shipcall_post(schemaModel)
assert query1==query2
return
def test_sql_post_shipcall():
"""issues a post-request with stub data and adds it to the database."""
from BreCal.database.sql_queries import create_sql_query_shipcall_post
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
pooledConnection = local_db.getPoolConnection()
try:
commands = pydapper.using(pooledConnection)
post_data = get_stub_valid_shipcall_shifting()
post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters).
schemaModel = get_stub_valid_ship_loaded_model(post_data)
query = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute", pooledConnection=pooledConnection)
assert schemas==1, f"unsuccessful query execution. Query: {query}"
# within the same pooledConnection, ask for the last inserted id
query = SQLQuery.get_shipcall_post_last_insert_id()
new_id = commands.execute_scalar(query)
assert new_id > 0, f"the new id should be unlike 0.."
# add participant assignments if we have a list of participants
if 'participants' in schemaModel:
pquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
for participant_assignment in schemaModel["participants"]:
schemas = execute_sql_query_standalone(query=pquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute", pooledConnection=pooledConnection)
from BreCal.stubs.user import get_user_simple
# assign an artificial user with id 29 (maxm) & participant type 136
user_data = get_user_simple().__dict__
user_data["id"] = 29
user_data["participant_id"] = 136
# POST in the history
query = SQLQuery.create_sql_query_history_post()
schemas = execute_sql_query_standalone(query=query, param={"scid" : new_id, "pid" : user_data["participant_id"], "uid" : user_data["id"]}, command_type="execute", pooledConnection=pooledConnection)
assert schemas == 1, f"unsuccessful history POST"
finally:
pooledConnection.close()
return
def test_sql_create_history_post_matches_legacy_function():
from BreCal.database.sql_queries import create_sql_query_history_post
query_refactored = SQLQuery.create_sql_query_history_post()
query_legacy = create_sql_query_history_post()
assert isinstance(query_refactored,str)
assert query_refactored==query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
def test_sql_get_shipcall_by_id():
schemaModel = {"id":63}
sentinel = object()
query = SQLQuery.get_shipcall_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theshipcall = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.Shipcall)
pooledConnection.close()
assert not theshipcall is sentinel, f"failed GET user query"
assert theshipcall.id==schemaModel["id"]
return
def test_sql_get_shipcall_by_id_short_version():
schemaModel = {"id":63}
# when model is defined, returns the data model
query = SQLQuery.get_shipcall_by_id()
schemas = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, model=model.Shipcall, command_type="single")
assert schemas.id==schemaModel["id"]
assert isinstance(schemas, model.Shipcall)
# when model = None, returns a dictionary
query = SQLQuery.get_shipcall_by_id()
schemas = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="single")
assert isinstance(schemas, dict)
assert schemas.get("id")==schemaModel["id"]
return
def test_sql_get_shipcall_put_refactored_equals_extended_version():
from BreCal.database.sql_queries import create_sql_query_shipcall_put
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
post_data = get_stub_valid_shipcall_shifting()
post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters).
schemaModel = get_stub_valid_ship_loaded_model(post_data)
legacy_query = create_sql_query_shipcall_put(schemaModel)
refactored_query = SQLQuery.get_shipcall_put(schemaModel)
assert refactored_query == legacy_query, f"version conflict. the refactored query must precisely match the legacy query!"
return
def test_sql_get_shipcall_participant_map_by_shipcall_id():
schemaModel = {"id":152}
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="query") # existing list of assignments
assert len(pdata)==4, f"there should be four assigned participants for the shipcall with id {schemaModel.get('id')}"
return
def test_sql__get_shipcall__get_spm__optionally_update_shipcall():
schemaModel = {'id': 152}
query = SQLQuery.get_shipcall_by_id()
shipcall = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="single", model=model.Shipcall)
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : shipcall.id}, command_type="query") # existing list of assignments
assert len(pdata)==4, f"there should be four assigned participants for the shipcall with id {shipcall.id}"
for participant_assignment in shipcall.participants:
found_participant = False
for elem in pdata:
if elem["participant_id"] == participant_assignment["participant_id"] and elem["type"] == participant_assignment["type"]:
found_participant = True
break
if not found_participant:
nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
ndata = execute_sql_query_standalone(query=nquery, param={"shipcall_id" : shipcall.id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute") # existing list of assignments
return
def test_sql__shipcall_post__get_last_insert_id__get_spm__update_participants__verify_changes():
"""
this combinatorial test:
1.) creates a novel shipcall
2.) obtains the ID of the just-created shipcall
3.) reads the participant map for that ID and verifies, that there are no participants listed
4.) iteratively updates the participant map of the ID (using proxy data)
5.) verifies the update
"""
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
try:
# 1.) create shipcall
post_data = get_stub_valid_shipcall_shifting()
post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters).
schemaModel = get_stub_valid_ship_loaded_model(post_data)
query = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute", pooledConnection=pooledConnection)
assert schemas==1, f"unsuccessful query execution. Query: {query}"
# 2.) obtain the ID of the novel shipcall
# within the same pooledConnection, ask for the last inserted id
query = SQLQuery.get_shipcall_post_last_insert_id()
new_id = commands.execute_scalar(query)
assert new_id > 0, f"the new id should be unlike 0.."
# 3.) read the ShipcallParticipantMap for the novel id
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments
assert len(pdata)==0, f"as the POST query does not include participants in this case, the novel id should not have assigned participants."
### proxy data ###
# loop across passed participant ids, creating entries for those not present in pdata
schemaModel = {'id': new_id, "participants":[{'id': 128, 'participant_id': 2, 'type': 4}, {'id': 129, 'participant_id': 3, 'type': 1}, {'id': 130, 'participant_id': 4, 'type': 2}, {'id': 131, 'participant_id': 6, 'type': 8}]}
# 4.) assign the participants
for participant_assignment in schemaModel["participants"]:
found_participant = False
for elem in pdata:
if elem["participant_id"] == participant_assignment["participant_id"] and elem["type"] == participant_assignment["type"]:
found_participant = True
break
if not found_participant:
nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
ndata = execute_sql_query_standalone(query=nquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute") # existing list of assignments
# 5.) verify the update (5 participants, including the false one)
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments
assert len(pdata)==5, f"due to the PUT, there shall now be five participants, as defined in schemaModel."
# 6.) delete the incorrect participant (last entry in the list in this case)
dquery = SQLQuery.get_shipcall_participant_map_delete_by_id()
ddata = execute_sql_query_standalone(query=dquery, param={"existing_id" : pdata[-1].get("id")}, command_type="execute")
# 7.) verify the update (now 4 participants)
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments
assert len(pdata)==4, f"due to the PUT, there shall now be five participants, as defined in schemaModel."
finally:
pooledConnection.close()
return
def test_sql_query_get_shipcalls_is_identical_to_legacy_query():
from BreCal.database.sql_queries import create_sql_query_shipcall_get
options = {'past_days':7}
query_refactored = SQLQuery.get_shipcalls(options)
query_legacy = create_sql_query_shipcall_get(options)
assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
def test_sql_query_post_ship_is_identical_to_legacy_query():
from BreCal.database.sql_queries import SQLQuery, create_sql_query_ship_post, create_sql_query_ship_put
from BreCal.stubs.ship import get_stub_valid_ship_loaded_model
schemaModel = get_stub_valid_ship_loaded_model()
query_refactored = SQLQuery.get_ship_post(schemaModel)
query_legacy = create_sql_query_ship_post(schemaModel)
assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
def test_sql_query_put_ship_is_identical_to_legacy_query():
from BreCal.database.sql_queries import SQLQuery, create_sql_query_ship_post, create_sql_query_ship_put
from BreCal.stubs.ship import get_stub_valid_ship_loaded_model
schemaModel = get_stub_valid_ship_loaded_model()
query_refactored = SQLQuery.get_ship_put(schemaModel)
query_legacy = create_sql_query_ship_put(schemaModel)
assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
#schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})