git_brcal/src/server/BreCal/database/sql_handler.py

480 lines
23 KiB
Python

import numpy as np
import pandas as pd
import pydapper
import datetime
import typing
from BreCal.schemas.model import Shipcall, Ship, Participant, Berth, User, Times, ShipcallParticipantMap
from BreCal.database.enums import ParticipantType
from BreCal.local_db import getPoolConnection
from BreCal.database.sql_queries import SQLQuery
from BreCal.schemas import model
def pandas_series_to_data_model():
return
def set_participant_type(x, participant_df)->int:
"""
when iterating over each row entry x in the shipcall_participant_map,
one can update the 'type' column by extracting the matching data from a participant dataframe
returns: participant_type
"""
participant_id = x["participant_id"]
participant_type = participant_df.loc[participant_id, "type"]
return participant_type
def get_synchronous_shipcall_times_standalone(query_time:pd.Timestamp, all_df_times:pd.DataFrame, delta_threshold=900)->int:
"""
This function counts all entries in {all_df_times}, which have the same timestamp as {query_time}.
It does so by:
1.) selecting all eta_berth & etd_berth entries
2.) measuring the timedelta towards {query_time}
3.) converting the timedelta to total absolute seconds (positive or negative time differences do not matter)
4.) applying a {delta_threshold} to identify, whether two times are too closely together
5.) counting the times, where the timedelta is below the threshold
returns: counts
"""
assert (isinstance(query_time,pd.Timestamp)) or (pd.isnull(query_time)), f"expected a timestamp. Found type: {type(query_time)} with value: {query_time}"
if pd.isnull(query_time):
return 0
# get a timedelta for each valid (not Null) time entry
time_deltas_eta = [(query_time.to_pydatetime()-time_.to_pydatetime()) for time_ in all_df_times.loc[:,"eta_berth"] if not pd.isnull(time_)]
time_deltas_etd = [(query_time.to_pydatetime()-time_.to_pydatetime()) for time_ in all_df_times.loc[:,"etd_berth"] if not pd.isnull(time_)]
# consider both, eta and etd times
time_deltas = time_deltas_eta + time_deltas_etd
# convert the timedelta to absolute total seconds
time_deltas = [abs(delta.total_seconds()) for delta in time_deltas]
# consider only those time deltas, which are <= the determined threshold
# create a list of booleans
time_deltas_filtered = [delta <= delta_threshold for delta in time_deltas]
# booleans can be added/counted in Python by using sum()
counts = sum(time_deltas_filtered) # int
return counts
def execute_sql_query_standalone(query, param={}, pooledConnection=None, model=None, command_type="query"):
"""
execute an arbitrary query with a set of parameters, return the output and convert it to a list.
when the pooled connection is rebuilt, it will be closed at the end of the function.
"""
rebuild_pooled_connection = pooledConnection is None
if rebuild_pooled_connection:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
# participant_query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?";
# creates a generator
try:
if command_type=="query":
if model is None:
schemas = commands.query(query, model=dict, param=param, buffered=False)
else:
schemas = commands.query(query, model=model, param=param, buffered=False)
# creates a list of results from the generator
schemas = [schema for schema in schemas]
elif command_type=="execute":
schemas = commands.execute(query, param=param)
elif command_type=="single":
sentinel = object()
# pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary.
# when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model.
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
if schemas is sentinel:
raise Exception("no such record")
elif command_type=="single_or_none":
sentinel = object()
# pulls a *single* row from the query. Typically, these queries require an ID within the param dictionary.
# when providing a model, such as model.Shipcall, the dataset is immediately translated into a data model.
schemas = commands.query_single_or_default(query, sentinel, param=param) if model is None else commands.query_single_or_default(query, sentinel, param=param, model=model)
schemas = None if schemas is sentinel else schemas
elif command_type=="execute_scalar":
schemas = commands.execute_scalar(query)
else:
raise ValueError(command_type)
finally: # if needed, ensure that the pooled connection is closed.
if rebuild_pooled_connection:
pooledConnection.close()
return schemas
def get_assigned_participant_of_type(shipcall_id:int, participant_type:typing.Union[int,model.ParticipantType])->typing.Optional[model.Participant]:
"""obtains the ShipcallParticipantMap of a given shipcall and finds the participant id of a desired type. Finally, returns the respective Participant"""
spm_shipcall_data = execute_sql_query_standalone(
query=SQLQuery.get_shipcall_participant_map_by_shipcall_id_and_type(),
param={"id":shipcall_id, "type":int(participant_type)},
command_type="query") # returns a list of matches
if len(spm_shipcall_data)==0:
return None
query = 'SELECT * FROM participant WHERE id=?participant_id?'
assigned_participant = execute_sql_query_standalone(
query=query,
param={"participant_id":spm_shipcall_data[0]["participant_id"]},
model=model.Participant,
command_type="single_or_none"
) # returns a list of matches
return assigned_participant
class SQLHandler():
"""
An object that reads SQL queries from the sql_connection and stores it in pandas DataFrames. The object can read all available tables
at once into memory, when providing 'read_all=True'.
# #TODO_initialization: shipcall_tug_map, user_role_map & role_securable_map might be mapped to the respective dataframes
"""
def __init__(self, sql_connection, read_all=False):
self.sql_connection = sql_connection
self.all_schemas = self.get_all_schemas_from_mysql()
self.build_str_to_model_dict()
if read_all:
self.read_all(self.all_schemas)
def execute_sql_query(self, sql_connection, query, param):
"""
this method is best used in combination with a python context-manager, such as:
with mysql.connector.connect(**mysql_connection_data) as sql_connection:
schema = sql_handler.execute_sql_query(sql_connection, query)
"""
schemas = execute_sql_query_standalone(query, param, pooledConnection=sql_connection)
return schemas
def get_all_schemas_from_mysql(self):
with self.sql_connection.cursor(buffered=True) as cursor:
cursor.execute("SHOW TABLES")
schema = cursor.fetchall()
all_schemas = [schem[0] for schem in schema]
return all_schemas
def build_str_to_model_dict(self):
"""
creates a simple dictionary, which maps a string to a data object
e.g.,
'ship'->BreCal.schemas.model.Ship object
"""
self.str_to_model_dict = {
"shipcall":Shipcall, "ship":Ship, "participant":Participant, "berth":Berth, "user":User, "times":Times,
"shipcall_participant_map":ShipcallParticipantMap
}
return
def read_mysql_table_to_df(self, table_name:str):
"""determine a {table_name}, which will be read from a mysql server. returns a pandas DataFrame with the respective data"""
with self.sql_connection.cursor(buffered=True) as cursor: #df = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=self.sql_connection)
# 1.) get the column names
cursor.execute(f"DESCRIBE {table_name}")
cols = cursor.fetchall()
column_names = [col_name[0] for col_name in cols]
# 2.) get the data tuples
cursor.execute(f"SELECT * FROM {table_name}")
data = cursor.fetchall()
# 3.) map the data tuples to the correct column names
data = [{k:v for k,v in zip(column_names, dat)} for dat in data]
# 4.) build a dataframe from the respective data models (which ensures the correct data type)
df = self.build_df_from_data_and_name(data, table_name)
return df
def build_df_from_data_and_name(self, data, table_name):
data_model = self.str_to_model_dict.get(table_name)
if data_model is not None:
df = pd.DataFrame([data_model(**dat) for dat in data], columns=list(data_model.__annotations__.keys()))
else:
df = pd.DataFrame([dat for dat in data])
return df
def mysql_to_df(self, query, table_name):
"""provide an arbitrary sql query that should be read from a mysql server {sql_connection}. returns a pandas DataFrame with the obtained data"""
with self.sql_connection.cursor(buffered=True) as cursor: # df = pd.read_sql(query, self.sql_connection).convert_dtypes()
# 1.) get the column names
cursor.execute(f"DESCRIBE {table_name}")
cols = cursor.fetchall()
column_names = [col_name[0] for col_name in cols]
# 2.) get the data tuples
cursor.execute(query)
data = cursor.fetchall()
# 3.) map the data tuples to the correct column names
data = [{k:v for k,v in zip(column_names, dat)} for dat in data]
# 4.) build a dataframe from the respective data models (which ensures the correct data type)
data_model = self.str_to_model_dict.get(table_name)
df = self.build_df_from_data_and_name(data, table_name)
if 'id' in df.columns:
df = df.set_index('id', inplace=False) # avoid inplace updates, so the raw sql remains unchanged
return df
def read_all(self, all_schemas):
# create a dictionary, which maps every mysql schema to pandas DataFrames
self.df_dict = self.build_full_mysql_df_dict(all_schemas)
# update the 'participants' column in 'shipcall'
self.initialize_shipcall_participant_list()
# update the 'type' in shipcall_participants_map
# fully deprecated
# self.add_participant_type_to_map()
return
def build_full_mysql_df_dict(self, all_schemas):
"""given a list of strings {all_schemas}, every schema will be read as individual pandas DataFrames to a dictionary with the respective keys. returns: dictionary {schema_name:pd.DataFrame}"""
mysql_df_dict = {}
for schem in all_schemas:
query = f"SELECT * FROM {schem}"
mysql_df_dict[schem] = self.mysql_to_df(query, table_name=schem)
return mysql_df_dict
def initialize_shipcall_participant_list(self):
"""
iteratively applies the .get_participants method to each shipcall.
the function updates the 'participants' column.
"""
# 1.) get all shipcalls
df = self.df_dict.get('shipcall')
# 2.) iterate over each individual shipcall, obtain the id (pandas calls it 'name')
# and apply the 'get_participants' method, which returns a list
# if the shipcall_id exists, the list contains ids
# otherwise, return a blank list
df['participants'] = df.apply(
lambda x: self.get_participants(x.name),
axis=1)
return
def add_participant_type_to_map(self):
"""
applies a lambda function, where the 'type'-column in the shipcall_participant_map is updated by reading the
respective data from the participants. Updates the shipcall_participant_map inplace.
"""
raise Exception("deprecated! Overwriting the shipcall_participant_map may cause harm, as a participant with multi-flag might be wrongfully assigned to multiple roles simultaneously.")
#spm = self.df_dict["shipcall_participant_map"]
#participant_df = self.df_dict["participant"]
#spm.loc[:,"type"] = spm.loc[:].apply(lambda x: set_participant_type(x, participant_df=participant_df),axis=1)
#self.df_dict["shipcall_participant_map"] = spm
return
def get_assigned_participants(self, shipcall)->pd.DataFrame:
"""return each participant of a respective shipcall, filtered by the shipcall id"""
# get the shipcall_participant_map
spm = self.df_dict["shipcall_participant_map"]
assigned_participants = spm.loc[spm["shipcall_id"]==shipcall.id]
return assigned_participants
def get_assigned_participants_by_type(self, assigned_participants:pd.DataFrame, participant_type:ParticipantType):
"""filters a dataframe of assigned_participants by the provided type enumerator"""
if isinstance(participant_type,int):
participant_type = ParticipantType(participant_type)
assigned_participants_of_type = assigned_participants.loc[[participant_type in ParticipantType(int(pt_)) for pt_ in list(assigned_participants["type"].values)]]
#assigned_participants_of_type = assigned_participants.loc[assigned_participants["type"]==participant_type.value]
return assigned_participants_of_type
def check_if_any_participant_of_type_is_unassigned(self, shipcall, *args:list[ParticipantType])->bool:
"""
given a list of input arguments, where item is a participant type, the function determines, whether at least one participant
was assigned for the type. Function returns a boolean, whether any of the required participants in unassigned.
This method is extensively used for the validation rules 0001, where the header is checked beforehand to identify, whether
the respective participant type is assigned already.
"""
assigned_participants = self.get_assigned_participants(shipcall)
unassigned = [] # becomes a list of booleans
for participant_type in args:
assignments_of_type = self.get_assigned_participants_by_type(assigned_participants, participant_type=participant_type)
unassignment = len(assignments_of_type)==0 # a participant type does not exist, when there is no match
unassigned.append(unassignment)
return any(unassigned) # returns a single boolean, whether ANY of the types is not assigned
def standardize_model_str(self, model_str:str)->str:
"""check if the 'model_str' is valid and apply lowercasing to the string"""
model_str = model_str.lower()
assert model_str in list(self.df_dict.keys()), f"cannot find the requested 'model_str' in mysql: {model_str}"
return model_str
def get_data(self, id:int, model_str:str):
"""
obtains {id} from the respective mysql database and builds a data model from that.
the id should match the 'id'-column in the mysql schema.
returns: data model, such as Ship, Shipcall, etc.
e.g.,
data = self.get_data(0,"shipcall")
returns a Shipcall object
"""
model_str = self.standardize_model_str(model_str)
df = self.df_dict.get(model_str)
data = self.df_loc_to_data_model(df, id, model_str)
return data
def get_all(self, model_str:str)->list:
"""
given a model string (e.g., 'shipcall'), return a list of all
data models of that type from the sql
"""
model_str = self.standardize_model_str(model_str)
all_ids = self.df_dict.get(model_str).index
all_data = [
self.get_data(_aid, model_str)
for _aid in all_ids
]
return all_data
def df_loc_to_data_model(self, df, id, model_str, loc_type:str="loc"):
if not len(df)>0:
import warnings
warnings.warn(f"empty dataframe in SQLHandler.df_loc_to_data_model for model type: {model_str}\n")
return df
# get a pandas series from the dataframe
series = df.loc[id] if loc_type=="loc" else df.iloc[id]
# get the respective data model object
data_model = self.str_to_model_dict.get(model_str,None)
assert data_model is not None, f"could not find the requested model_str: {model_str}"
# build 'data' and fill the data model object
# convert the 'id' to an integer, so the np.uint64 (used by pandas) is convertible to mysql
data = {**{'id':int(id)}, **series.to_dict()} # 'id' must be added manually, as .to_dict does not contain the index, which was set with .set_index
data = data_model(**data)
return data
def filter_df_by_participant_type(self, df, participant_type:typing.Union[int, ParticipantType])->pd.DataFrame:
"""
As ParticipantTypes are Flag objects, a dataframe's integer might resemble multiple participant types simultaneously.
This function allows for more complex filters, as the IntFlag allows more complex queries
e.g.:
ParticipantType(6) is 2,4 (2+4 = 6)
Participant(2) in Participant(6) = True # 6 is both, 2 and 4
Participant(1) in Participant(6) = False # 6 is both, 2 and 4, but not 1
"""
if isinstance(participant_type,int):
participant_type = ParticipantType(participant_type)
filtered_df = df.loc[[participant_type in ParticipantType(df_pt) for df_pt in list(df["participant_type"].values)]]
return filtered_df
def get_times_for_participant_type(self, df_times, participant_type:int):
filtered_series = self.filter_df_by_participant_type(df_times, participant_type)
#filtered_series = df_times.loc[df_times["participant_type"]==participant_type]
if len(filtered_series)==0:
return None
if not len(filtered_series)<=1:
# correcting the error: ERROR:root:found multiple results
# however, a warning will still be issued
import warnings
warnings.warn(f"found multiple results in function SQLHandler.get_times_for_participant_type\nConsidering only the first match!\nAffected Times Indexes: {filtered_series.index}")
times = self.df_loc_to_data_model(filtered_series, id=0, model_str='times', loc_type="iloc") # use iloc! to retrieve the first result
return times
def dataframe_to_data_model_list(self, df, model_str)->list:
model_str = self.standardize_model_str(model_str)
all_ids = df.index
all_data = [
self.df_loc_to_data_model(df, _aid, model_str)
for _aid in all_ids
]
return all_data
def get_participants(self, shipcall_id:id)->list:
"""
given a {shipcall_id}, obtain the respective list of participants.
when there are no participants, return a blank list
returns: participant_id_list, where every element is an int
"""
df = self.df_dict.get("shipcall_participant_map")
if 'shipcall_id' in list(df.columns):
df = df.set_index('shipcall_id', inplace=False)
# the 'if' call is needed to ensure, that no Exception is raised, when the shipcall_id is not present in the df
participant_id_list = df.loc[shipcall_id, "participant_id"].tolist() if shipcall_id in list(df.index) else []
if not isinstance(participant_id_list,list):
participant_id_list = [participant_id_list]
return participant_id_list
def get_times_of_shipcall(self, shipcall)->pd.DataFrame:
df_times = self.df_dict.get('times') # -> pd.DataFrame
df_times = df_times.loc[df_times["shipcall_id"]==shipcall.id]
return df_times
def get_times_for_agency(self, non_null_column=None)->pd.DataFrame:
"""
options:
non_null_column:
None or str. If provided, the 'non_null_column'-column of the dataframe will be filtered,
so only entries with provided values are returned (filters all NaN and NaT entries)
"""
# get all times
df_times = self.df_dict.get('times') # -> pd.DataFrame
# filter out all NaN and NaT entries
if non_null_column is not None:
# in the Pandas documentation, it says for .isnull():
# "This function takes a scalar or array-like object and indicates whether values are missing
# (NaN in numeric arrays, None or NaN in object arrays, NaT in datetimelike)."
df_times = df_times.loc[~df_times[non_null_column].isnull()] # NOT null filter
# filter by the agency participant_type
times_agency = self.filter_df_by_participant_type(df_times, ParticipantType.AGENCY.value)
#times_agency = df_times.loc[df_times["participant_type"]==ParticipantType.AGENCY.value]
return times_agency
def filter_df_by_key_value(self, df, key, value)->pd.DataFrame:
return df.loc[df[key]==value]
def get_unique_ship_counts(self, all_df_times:pd.DataFrame, times_agency:pd.DataFrame, query:str, rounding:str="min", maximum_threshold=3):
"""given a dataframe of all agency times, get all unique ship counts, their values (datetime) and the string tags. returns a tuple (values,unique,counts)"""
# #deprecated!
import warnings
warnings.warn(f"SQLHandler.get_unique_ship_counts is deprecated. Instead, please use SQLHandler.count_synchronous_shipcall_times")
# optional: rounding
if rounding is not None:
all_df_times.loc[:, query] = pd.to_datetime(all_df_times.loc[:, query]).dt.round(rounding) # e.g., 'min' --- # correcting the error: 'AttributeError: Can only use .dt accessor with datetimelike values'
query_time_agency = pd.to_datetime(times_agency[query]).iloc[0].round(rounding)# e.g., 'min'
# after rounding, filter {all_df_times}, so only those, which match the current query are of interest
# takes 'times_agency' to sample, which value should match
all_df_times = all_df_times.loc[all_df_times[query]==query_time_agency]
# finally, count all remaining entries
values = all_df_times.loc[:, query]
# get unique entries and counts
counts = len(values) # unique, counts = np.unique(values, return_counts=True)
return counts # (values, unique, counts)
def count_synchronous_shipcall_times(self, query_time:pd.Timestamp, all_df_times:pd.DataFrame, delta_threshold=900)->int:
"""count all times entries, which are too close to the query_time. The {delta_threshold} determines the threshold. returns counts (int)"""
if all_df_times is None:
all_df_times = self.df_dict.get("times")
return get_synchronous_shipcall_times_standalone(query_time, all_df_times, delta_threshold)