This repository has been archived on 2025-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
BreCal/src/server/tests/database/test_sql_queries.py

528 lines
26 KiB
Python

import pytest
import os
import bcrypt
import pydapper
from BreCal import local_db
from BreCal.database.sql_handler import execute_sql_query_standalone
from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model
from BreCal.stubs.user import get_user_simple
instance_path = os.path.join(os.path.expanduser('~'), "brecal", "src", "server", "instance", "instance")
local_db.initPool(os.path.dirname(instance_path), connection_filename="connection_data_local.json")
def test_sql_query_every_call_returns_str():
assert isinstance(SQLQuery.get_berth(), str)
return
def test_sql_query_get_berths():
schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})
berths = [model.Berth(**schema) for schema in schemas]
assert all([isinstance(berth, model.Berth) for berth in berths]), f"one of the returned schemas is not a Berth object"
return
def test_sql_query_get_history():
options = {"shipcall_id":157}
history = execute_sql_query_standalone(query=SQLQuery.get_history(), param={"shipcallid" : options["shipcall_id"]}, model=model.History.from_query_row)
assert all([isinstance(hist,model.History) for hist in history])
return
def test_sql_query_get_user():
options = {"username":"maxm"}
users = execute_sql_query_standalone(query=SQLQuery.get_user(), param={"username" : options["username"]}, model=model.User)
assert all([isinstance(user,model.User) for user in users])
assert users[0].user_name==options["username"]
return
def test_sql_get_notifications():
import mysql.connector
# unfortunately, there currently is *no* notification in the database.
with pytest.raises(mysql.connector.errors.ProgrammingError, match="Unknown column 'shipcall_id' in 'field list'"):
options = {"shipcall_id":417}
notifications = execute_sql_query_standalone(query=SQLQuery.get_notifications(), param={"scid" : options["shipcall_id"]}, model=model.Notification.from_query_row)
assert all([isinstance(notification,model.Notification) for notification in notifications])
return
def test_sql_get_participants():
participants = execute_sql_query_standalone(query=SQLQuery.get_participants(), param={}, model=model.Participant)
assert all([isinstance(participant,model.Participant) for participant in participants])
return
def test_sql_get_participants():
options = {"user_id":29}
query = SQLQuery.get_participant_by_user_id()
participants = execute_sql_query_standalone(query=query, param={"userid" : options["user_id"]}, model=model.Participant)
assert all([isinstance(participant,model.Participant) for participant in participants])
assert len(participants)==1, f"there should only be one match for the respective user id"
assert participants[0].id == 136, f"user 29 belongs to participant_id 136"
return
def test_sql_get_shipcalls():
from BreCal.database.sql_queries import create_sql_query_shipcall_get
# different styles for the same outcome
options = {"past_days":3000}
query = create_sql_query_shipcall_get(options)
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
query = SQLQuery.get_shipcalls() # defaults to 'past_days'=3
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
query = SQLQuery.get_shipcalls({'past_days':3000})
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
# fails: options must contain 'past_days' key
with pytest.raises(AssertionError, match="there must be a key 'past_days' in the options, which determines"):
query = SQLQuery.get_shipcalls(options={})
shipcalls = execute_sql_query_standalone(query=query, param=options, model=model.Shipcall.from_query_row)
return
def test_sql_get_ships():
ships = execute_sql_query_standalone(query=SQLQuery.get_ships(), model=model.Ship)
assert all([isinstance(ship, model.Ship) for ship in ships])
return
def test_sql_get_times():
options = {'shipcall_id':153}
times = execute_sql_query_standalone(query=SQLQuery.get_times(), model=model.Times, param={"scid" : options["shipcall_id"]})
assert all([isinstance(time_,model.Times) for time_ in times])
assert times[0].shipcall_id==options["shipcall_id"]
return
def test_sql_get_user_by_id():
# success: id 29 exists
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
pooledConnection.close()
theuser.id == schemaModel["id"]
# fails: id 292212 does not exist (returns default, which is the sentinel object in this case)
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 292212
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
if theuser is sentinel:
pooledConnection.close()
assert theuser is sentinel
return
def test_sql_get_user_put():
"""the PUT query for a user must be built based on a schemaModel, as the available data is dynamic and the query must be adaptive."""
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
query = "UPDATE user SET "
isNotFirst = False
for key in schemaModel.keys():
if key == "id":
continue
if key == "old_password":
continue
if key == "new_password":
continue
if isNotFirst:
query += ", "
isNotFirst = True
query += key + " = ?" + key + "? "
query += "WHERE id = ?id?"
assert SQLQuery.get_user_put(schemaModel) == query
return
def test_sql_user_put_set_lastname_check_unset_lastname_check():
"""
Simply put, this method updates the last_name of user 29 to "Metz", verifies the change,
and then proceeds to set it back to "Mustermann", and verifies the change.
"""
# 1.) SET the last_name of user_id 29 to 'Metz' by a PUT command
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
schemaModel["last_name"] = "Metz" # -> "Metz" -> "Mustermann"
schemaModel = {k:v for k,v in schemaModel.items() if k in ["id", "last_name"]}
query = SQLQuery.get_user_put(schemaModel)
affected_rows = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
assert affected_rows==1, f"at least one row should be affected by this call."
# 2.) GET the user_id 29 and verify the last_name is 'Metz'
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert theuser.last_name=="Metz", f"PUT command has been unsuccessful."
# 3.) SET the last_name of user_id 29 to 'Mustermann' by a PUT command
schemaModel = theuser.__dict__
schemaModel["last_name"] = "Mustermann"
schemaModel = {k:v for k,v in schemaModel.items() if k in ["id", "last_name"]}
query = SQLQuery.get_user_put(schemaModel)
affected_rows = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute")
assert affected_rows==1, f"at least one row should be affected by this call."
# 4.) GET the user_id 29 and verify the last_name is 'Mustermann'
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert theuser.last_name=="Mustermann", f"PUT command has been unsuccessful."
return
def test_sql_user_update_password():
"""This test updates the default password of user 29 from 'Start1234' to 'Start4321' and afterwards sets it back to 'Start1234'."""
# #TODO: this test very openly displays the password of 'maxm'. It makes sense to create a stub user in the database, which can be
# used for these tests, so an account without any importance or valuable assigned role is insecure instead.
# Set. Update the password of user 29 from 'Start1234' to 'Start4321'
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
schemaModel["old_password"] = "Start1234"
schemaModel["new_password"] = "Start4321"
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")), f"old password does not match to the database entry"
password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
query = SQLQuery.get_update_user_password()
affected_rows = execute_sql_query_standalone(query=query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}, command_type="execute")
assert affected_rows == 1
# 2.) Revert. Set password back to the default (from 'Start4321' to 'Start1234')
schemaModel = get_user_simple().__dict__
schemaModel["id"] = 29
schemaModel["old_password"] = "Start4321"
schemaModel["new_password"] = "Start1234"
sentinel = object()
query = SQLQuery.get_user_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theuser = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.User)
pooledConnection.close()
assert not theuser is sentinel, f"failed GET user query"
assert bcrypt.checkpw(schemaModel["old_password"].encode("utf-8"), bytes(theuser.password_hash, "utf-8")), f"old password does not match to the database entry"
password_hash = bcrypt.hashpw(schemaModel["new_password"].encode('utf-8'), bcrypt.gensalt( 12 )).decode('utf8')
query = SQLQuery.get_update_user_password()
affected_rows = execute_sql_query_standalone(query=query, param={"password_hash" : password_hash, "id" : schemaModel["id"]}, command_type="execute")
assert affected_rows == 1
return
def test_sql_get_participants_of_shipcall_id():
shipcall_id = 389
query = SQLQuery.get_participants()
participants = execute_sql_query_standalone(query=query, model=dict, param={"shipcall_id" : shipcall_id})
assert all([part.get("participant_id") is not None for part in participants])
assert all([part.get("type") is not None for part in participants])
# try to convert every participant into a model.Participant_Assignment
participants = [
model.Participant_Assignment(part["participant_id"], part["type"])
for part in participants
]
assert all([isinstance(part,model.Participant_Assignment) for part in participants])
return
def test_sql_get_all_shipcalls_and_assign_participants():
"""
this test reproduces the SQL query within BreCal.impl.shipcalls to make sure, that the
query first returns all shipcalls, and then assigns all participants of the respective shipcall to it.
"""
# get all shipcalls
options = {'past_days':30000}
query = SQLQuery.get_shipcalls(options)
shipcalls = execute_sql_query_standalone(query=query, model=model.Shipcall.from_query_row)
assert all([isinstance(shipcall,model.Shipcall) for shipcall in shipcalls])
# for every shipcall, assign all of its participants to it
for shipcall in shipcalls:
participant_query = SQLQuery.get_participants()
participants = execute_sql_query_standalone(query=participant_query, model=dict, param={"shipcall_id" : shipcall.id})
for record in participants:
pa = model.Participant_Assignment(record["participant_id"], record["type"])
shipcall.participants.append(pa)
assert any([
any([isinstance(participant, model.Participant_Assignment) for participant in shipcall.participants])
for shipcall in shipcalls
]), f"at least one of the shipcalls should have an assigned model.Participant_Assignment"
return
def test_sqlquery_get_shipcal_post_identical_to_create_sql_query_shipcall_post():
from BreCal.database.sql_queries import create_sql_query_shipcall_post
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
post_data = get_stub_valid_shipcall_shifting()
schemaModel = get_stub_valid_ship_loaded_model(post_data)
query1 = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise)
query2 = create_sql_query_shipcall_post(schemaModel)
assert query1==query2
return
def test_sql_post_shipcall():
"""issues a post-request with stub data and adds it to the database."""
from BreCal.database.sql_queries import create_sql_query_shipcall_post
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
pooledConnection = local_db.getPoolConnection()
try:
commands = pydapper.using(pooledConnection)
post_data = get_stub_valid_shipcall_shifting()
post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters).
schemaModel = get_stub_valid_ship_loaded_model(post_data)
query = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute", pooledConnection=pooledConnection)
assert schemas==1, f"unsuccessful query execution. Query: {query}"
# within the same pooledConnection, ask for the last inserted id
query = SQLQuery.get_shipcall_post_last_insert_id()
new_id = commands.execute_scalar(query)
assert new_id > 0, f"the new id should be unlike 0.."
# add participant assignments if we have a list of participants
if 'participants' in schemaModel:
pquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
for participant_assignment in schemaModel["participants"]:
schemas = execute_sql_query_standalone(query=pquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute", pooledConnection=pooledConnection)
from BreCal.stubs.user import get_user_simple
# assign an artificial user with id 29 (maxm) & participant type 136
user_data = get_user_simple().__dict__
user_data["id"] = 29
user_data["participant_id"] = 136
# POST in the history
query = SQLQuery.create_sql_query_history_post()
schemas = execute_sql_query_standalone(query=query, param={"scid" : new_id, "pid" : user_data["participant_id"], "uid" : user_data["id"]}, command_type="execute", pooledConnection=pooledConnection)
assert schemas == 1, f"unsuccessful history POST"
finally:
pooledConnection.close()
return
def test_sql_create_history_post_matches_legacy_function():
from BreCal.database.sql_queries import create_sql_query_history_post
query_refactored = SQLQuery.create_sql_query_history_post()
query_legacy = create_sql_query_history_post()
assert isinstance(query_refactored,str)
assert query_refactored==query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
def test_sql_get_shipcall_by_id():
schemaModel = {"id":63}
sentinel = object()
query = SQLQuery.get_shipcall_by_id()
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
theshipcall = commands.query_single_or_default(query, sentinel, param={"id" : schemaModel["id"]}, model=model.Shipcall)
pooledConnection.close()
assert not theshipcall is sentinel, f"failed GET user query"
assert theshipcall.id==schemaModel["id"]
return
def test_sql_get_shipcall_by_id_short_version():
schemaModel = {"id":63}
# when model is defined, returns the data model
query = SQLQuery.get_shipcall_by_id()
schemas = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, model=model.Shipcall, command_type="single")
assert schemas.id==schemaModel["id"]
assert isinstance(schemas, model.Shipcall)
# when model = None, returns a dictionary
query = SQLQuery.get_shipcall_by_id()
schemas = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="single")
assert isinstance(schemas, dict)
assert schemas.get("id")==schemaModel["id"]
return
def test_sql_get_shipcall_put_refactored_equals_extended_version():
from BreCal.database.sql_queries import create_sql_query_shipcall_put
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
post_data = get_stub_valid_shipcall_shifting()
post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters).
schemaModel = get_stub_valid_ship_loaded_model(post_data)
legacy_query = create_sql_query_shipcall_put(schemaModel)
refactored_query = SQLQuery.get_shipcall_put(schemaModel)
assert refactored_query == legacy_query, f"version conflict. the refactored query must precisely match the legacy query!"
return
def test_sql_get_shipcall_participant_map_by_shipcall_id():
schemaModel = {"id":152}
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="query") # existing list of assignments
assert len(pdata)==4, f"there should be four assigned participants for the shipcall with id {schemaModel.get('id')}"
return
def test_sql__get_shipcall__get_spm__optionally_update_shipcall():
schemaModel = {'id': 152}
query = SQLQuery.get_shipcall_by_id()
shipcall = execute_sql_query_standalone(query=query, param={"id" : schemaModel["id"]}, command_type="single", model=model.Shipcall)
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : shipcall.id}, command_type="query") # existing list of assignments
assert len(pdata)==4, f"there should be four assigned participants for the shipcall with id {shipcall.id}"
for participant_assignment in shipcall.participants:
found_participant = False
for elem in pdata:
if elem["participant_id"] == participant_assignment["participant_id"] and elem["type"] == participant_assignment["type"]:
found_participant = True
break
if not found_participant:
nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
ndata = execute_sql_query_standalone(query=nquery, param={"shipcall_id" : shipcall.id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute") # existing list of assignments
return
def test_sql__shipcall_post__get_last_insert_id__get_spm__update_participants__verify_changes():
"""
this combinatorial test:
1.) creates a novel shipcall
2.) obtains the ID of the just-created shipcall
3.) reads the participant map for that ID and verifies, that there are no participants listed
4.) iteratively updates the participant map of the ID (using proxy data)
5.) verifies the update
"""
from BreCal.stubs.shipcall import get_stub_valid_shipcall_shifting, get_stub_valid_ship_loaded_model
pooledConnection = local_db.getPoolConnection()
commands = pydapper.using(pooledConnection)
try:
# 1.) create shipcall
post_data = get_stub_valid_shipcall_shifting()
post_data["voyage"] = "pytestRS71" # perform tagging to identify the shipcalls created by pytests (<16 characters, no special characters).
schemaModel = get_stub_valid_ship_loaded_model(post_data)
query = SQLQuery.get_shipcall_post(schemaModel) # refactored variant of create_sql_query_shipcall_post (more concise)
schemas = execute_sql_query_standalone(query=query, param=schemaModel, command_type="execute", pooledConnection=pooledConnection)
assert schemas==1, f"unsuccessful query execution. Query: {query}"
# 2.) obtain the ID of the novel shipcall
# within the same pooledConnection, ask for the last inserted id
query = SQLQuery.get_shipcall_post_last_insert_id()
new_id = commands.execute_scalar(query)
assert new_id > 0, f"the new id should be unlike 0.."
# 3.) read the ShipcallParticipantMap for the novel id
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments
assert len(pdata)==0, f"as the POST query does not include participants in this case, the novel id should not have assigned participants."
### proxy data ###
# loop across passed participant ids, creating entries for those not present in pdata
schemaModel = {'id': new_id, "participants":[{'id': 128, 'participant_id': 2, 'type': 4}, {'id': 129, 'participant_id': 3, 'type': 1}, {'id': 130, 'participant_id': 4, 'type': 2}, {'id': 131, 'participant_id': 6, 'type': 8}]}
# 4.) assign the participants
for participant_assignment in schemaModel["participants"]:
found_participant = False
for elem in pdata:
if elem["participant_id"] == participant_assignment["participant_id"] and elem["type"] == participant_assignment["type"]:
found_participant = True
break
if not found_participant:
nquery = SQLQuery.get_shipcall_post_update_shipcall_participant_map()
ndata = execute_sql_query_standalone(query=nquery, param={"shipcall_id" : new_id, "participant_id" : participant_assignment["participant_id"], "type" : participant_assignment["type"]}, command_type="execute") # existing list of assignments
# 5.) verify the update (5 participants, including the false one)
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments
assert len(pdata)==5, f"due to the PUT, there shall now be five participants, as defined in schemaModel."
# 6.) delete the incorrect participant (last entry in the list in this case)
dquery = SQLQuery.get_shipcall_participant_map_delete_by_id()
ddata = execute_sql_query_standalone(query=dquery, param={"existing_id" : pdata[-1].get("id")}, command_type="execute")
# 7.) verify the update (now 4 participants)
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
pdata = execute_sql_query_standalone(query=query, param={"id" : new_id}, command_type="query") # existing list of assignments
assert len(pdata)==4, f"due to the PUT, there shall now be five participants, as defined in schemaModel."
finally:
pooledConnection.close()
return
def test_sql_query_get_shipcalls_is_identical_to_legacy_query():
from BreCal.database.sql_queries import create_sql_query_shipcall_get
options = {'past_days':7}
query_refactored = SQLQuery.get_shipcalls(options)
query_legacy = create_sql_query_shipcall_get(options)
assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
def test_sql_query_post_ship_is_identical_to_legacy_query():
from BreCal.database.sql_queries import SQLQuery, create_sql_query_ship_post, create_sql_query_ship_put
from BreCal.stubs.ship import get_stub_valid_ship_loaded_model
schemaModel = get_stub_valid_ship_loaded_model()
query_refactored = SQLQuery.get_ship_post(schemaModel)
query_legacy = create_sql_query_ship_post(schemaModel)
assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
def test_sql_query_put_ship_is_identical_to_legacy_query():
from BreCal.database.sql_queries import SQLQuery, create_sql_query_ship_post, create_sql_query_ship_put
from BreCal.stubs.ship import get_stub_valid_ship_loaded_model
schemaModel = get_stub_valid_ship_loaded_model()
query_refactored = SQLQuery.get_ship_put(schemaModel)
query_legacy = create_sql_query_ship_put(schemaModel)
assert query_refactored == query_legacy, f"the refactored code to generate the query must be absolutely identical to the legacy version"
return
#schemas = execute_sql_query_standalone(query=SQLQuery.get_berth(), param={})