This repository has been archived on 2025-02-17. You can view files and clone it, but cannot push or open issues or pull requests.
BreCal/src/lib_brecal_utils/brecal_utils/database/update_database.py

96 lines
4.4 KiB
Python

import json
import pydapper
import pandas as pd
import mysql.connector
from brecal_utils.database.sql_handler import SQLHandler
from brecal_utils.validators.validation_rules import ValidationRules
from BreCal.schemas.model import Shipcall
def update_shipcall_in_mysql_database(sql_connection, shipcall:Shipcall, relevant_keys:list = ["evaluation", "evaluation_message"]):
"""
given an individual schemaModel (e.g., Shipcall.__dict__), update the entry within the mysql database
options:
sql_connection: an instance of mysql.connector.connect
shipcall: a Shipcall data model
relevant_keys: a list of the keys to be updated. Should never contain 'id', which is immutable
returns: (query, affected_rows)
"""
assert not "id" in relevant_keys, f"the 'id' key should never be updated."
schemaModel = shipcall.__dict__
commands = pydapper.using(sql_connection)
sentinel = object()
theshipcall = commands.query_single_or_default("SELECT * FROM shipcall where id = ?id?", sentinel, param={"id" : schemaModel["id"]})
if theshipcall is sentinel:
return json.dumps("no such record"), 404, {'Content-Type': 'application/json; charset=utf-8'}
query = build_mysql_query_to_update_shipcall(shipcall=shipcall, relevant_keys=relevant_keys)
affected_rows = commands.execute(query, param=schemaModel)
return (query, affected_rows)
def build_mysql_query_to_update_shipcall(shipcall, relevant_keys:list):
"""builds a mysql query, which updates the shipcall table. In particular, the provided shipcall will be updated for each key in {relevant_keys}"""
schemaModel = shipcall.__dict__
# prepare prefix and suffix. Then build the body of the query
prefix = "UPDATE shipcall SET "
suffix = "where id = ?id?"
body = ", ".join([f"{key} = ?{key}? " for key in schemaModel.keys() if (key in relevant_keys)]) # .join ignores the first ', ', which equals the 'isNotFirst' boolean-loop
# build query
query = f"{prefix}{body}{suffix}"
return query
def update_all_shipcalls_in_mysql_database(sql_connection, sql_handler:SQLHandler, shipcall_df:pd.DataFrame)->None:
"""
iterates over each shipcall_id in a shipcall dataframe, builds Shipcall data models and updates those in the sql database, which
is located in {sql_connection}
options:
sql_connection: an instance of mysql.connector.connect
sql_handler: an SQLHandler instance
shipcall_df: dataframe, which stores the data that is used to retrieve the shipcall data models (that are then updated in the database)
"""
for shipcall_id in shipcall_df.index:
shipcall = sql_handler.df_loc_to_data_model(df=shipcall_df, id=shipcall_id, model_str="shipcall")
update_shipcall_in_mysql_database(sql_connection, shipcall=shipcall, relevant_keys = ["evaluation", "evaluation_message"])
return
def run_validation_rules(mysql_connector_instance, debug=False)->pd.DataFrame:
"""
options:
mysql_connector_instance: an instance created by the mysql.connector.connect() call. It is advised to use Python's context manager to close the connection after finished.
e.g.,
with mysql.connector.connect(**mysql_connection_data) as mysql_connector_instance:
run_validation_rules(mysql_connector_instance)
returns None
"""
sql_handler = SQLHandler(sql_connection=mysql_connector_instance, read_all=True)
vr = ValidationRules(sql_handler)
shipcall_df = sql_handler.df_dict.get("shipcall")
# placeholder: filter shipcalls. For example, exclude historic entries.
shipcall_df = vr.evaluate_shipcalls(shipcall_df)
if debug:
return shipcall_df
# iterate over each shipcall in shipcall_df and update the respective entry in the mysql database
update_all_shipcalls_in_mysql_database(sql_connection=mysql_connector_instance, sql_handler=sql_handler, shipcall_df=shipcall_df)
return shipcall_df
def update_shipcall_evaluation_state(mysql_connection_data)->pd.DataFrame:
"""
single line function to connect to a mysql database (using the {mysql_connection_data}), evaluate each shipcall (bei traffic state)
and finally, update those in the database.
"""
with mysql.connector.connect(**mysql_connection_data) as mysql_connector_instance:
shipcall_df = run_validation_rules(mysql_connector_instance=mysql_connector_instance, debug=False)
return shipcall_df