434 lines
16 KiB
Python
434 lines
16 KiB
Python
import logging
|
|
|
|
|
|
def create_sql_query_shipcall_get(options:dict)->str:
|
|
"""
|
|
creates an SQL query, which selects all shipcalls from the mysql database.
|
|
the agency eta times are used to order the entries.
|
|
|
|
args:
|
|
options : dict. A dictionary, which must contains the 'past_days' key (int). Determines the range
|
|
by which shipcalls are filtered.
|
|
"""
|
|
query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " +
|
|
"flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " +
|
|
"tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " +
|
|
"evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " +
|
|
"FROM shipcall s " +
|
|
"LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " +
|
|
"WHERE " +
|
|
"(type = 1 AND " +
|
|
"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL %d DAY)) OR " +
|
|
"(eta >= DATE(NOW() - INTERVAL %d DAY)))) OR " +
|
|
"((type = 2 OR type = 3) AND " +
|
|
"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL %d DAY)) OR " +
|
|
"(etd >= DATE(NOW() - INTERVAL %d DAY)))) " +
|
|
"ORDER BY eta") % (options["past_days"], options["past_days"], options["past_days"], options["past_days"])
|
|
|
|
return query
|
|
|
|
|
|
def create_sql_query_shipcall_post(schemaModel:dict)->str:
|
|
query = "INSERT INTO shipcall ("
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
if key == "id":
|
|
continue
|
|
if key == "participants":
|
|
continue
|
|
if key == "created":
|
|
continue
|
|
if key == "modified":
|
|
continue
|
|
if key == "evaluation":
|
|
continue
|
|
if key == "evaluation_message":
|
|
continue
|
|
if key == "type_value":
|
|
continue
|
|
if key == "evaluation_value":
|
|
continue
|
|
if isNotFirst:
|
|
query += ","
|
|
isNotFirst = True
|
|
query += key
|
|
query += ") VALUES ("
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
param_key = key
|
|
if key == "id":
|
|
continue
|
|
if key == "participants":
|
|
continue
|
|
if key == "created":
|
|
continue
|
|
if key == "modified":
|
|
continue
|
|
if key == "evaluation":
|
|
continue
|
|
if key == "evaluation_message":
|
|
continue
|
|
if key == "type":
|
|
param_key = "type_value"
|
|
if key == "type_value":
|
|
continue
|
|
if key == "evaluation":
|
|
param_key = "evaluation_value"
|
|
if key == "evaluation_value":
|
|
continue
|
|
if isNotFirst:
|
|
query += ","
|
|
isNotFirst = True
|
|
query += "?" + param_key + "?"
|
|
query += ")"
|
|
return query
|
|
|
|
def create_sql_query_shipcall_put(schemaModel:dict)->str:
|
|
query = "UPDATE shipcall SET "
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
param_key = key
|
|
if key == "id":
|
|
continue
|
|
if key == "participants":
|
|
continue
|
|
if key == "created":
|
|
continue
|
|
if key == "modified":
|
|
continue
|
|
if key == "evaluation":
|
|
continue
|
|
if key == "evaluation_message":
|
|
continue
|
|
if key == "type":
|
|
param_key = "type_value"
|
|
if key == "type_value":
|
|
continue
|
|
if key == "evaluation":
|
|
param_key = "evaluation_value"
|
|
if key == "evaluation_value":
|
|
continue
|
|
if isNotFirst:
|
|
query += ", "
|
|
isNotFirst = True
|
|
query += key + " = ?" + param_key + "? "
|
|
|
|
query += "WHERE id = ?id?"
|
|
return query
|
|
|
|
|
|
def create_sql_query_history_post()->str:
|
|
query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 1)"
|
|
return query
|
|
|
|
def create_sql_query_history_put()->str:
|
|
query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)"
|
|
return query
|
|
|
|
def create_sql_query_user_put(schemaModel:dict):
|
|
query = "UPDATE user SET "
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
if key == "id":
|
|
continue
|
|
if key == "old_password":
|
|
continue
|
|
if key == "new_password":
|
|
continue
|
|
if isNotFirst:
|
|
query += ", "
|
|
isNotFirst = True
|
|
query += key + " = ?" + key + "? "
|
|
|
|
query += "WHERE id = ?id?"
|
|
return query
|
|
|
|
def create_sql_query_ship_post(schemaModel:dict):
|
|
query = "INSERT INTO ship ("
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
if key == "id":
|
|
continue
|
|
if key == "created":
|
|
continue
|
|
if key == "modified":
|
|
continue
|
|
if isNotFirst:
|
|
query += ","
|
|
isNotFirst = True
|
|
query += key
|
|
query += ") VALUES ("
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
if key == "id":
|
|
continue
|
|
if key == "created":
|
|
continue
|
|
if key == "modified":
|
|
continue
|
|
if isNotFirst:
|
|
query += ","
|
|
isNotFirst = True
|
|
query += "?" + key + "?"
|
|
query += ")"
|
|
return query
|
|
|
|
def create_sql_query_ship_put(schemaModel:dict):
|
|
query = "UPDATE ship SET "
|
|
isNotFirst = False
|
|
for key in schemaModel.keys():
|
|
if key == "id":
|
|
continue
|
|
if key == "created":
|
|
continue
|
|
if key == "modified":
|
|
continue
|
|
if isNotFirst:
|
|
query += ", "
|
|
isNotFirst = True
|
|
query += key + " = ?" + key + "? "
|
|
|
|
query += "WHERE id = ?id?"
|
|
return query
|
|
|
|
|
|
|
|
class SQLQuery():
|
|
"""
|
|
This class provides quick access to different SQL query functions, which creates default queries for the BreCal package.
|
|
Each method is callable without initializing the SQLQuery object.
|
|
|
|
Example:
|
|
SQLQuery.get_berths()
|
|
|
|
When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues.
|
|
"""
|
|
def __init__(self) -> None:
|
|
pass
|
|
|
|
@staticmethod
|
|
def get_berth()->str:
|
|
query = "SELECT id, name, `lock`, owner_id, authority_id, created, modified, deleted FROM berth WHERE deleted = 0 ORDER BY name"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_history()->str:
|
|
query = "SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_user()->str:
|
|
query = "SELECT id, participant_id, first_name, last_name, user_name, user_email, user_phone, password_hash, " +\
|
|
"api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, created, modified FROM user " +\
|
|
"WHERE user_name = ?username? OR user_email = ?username?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_notifications()->str:
|
|
query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \
|
|
"WHERE shipcall_id = ?scid?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_participant_by_user_id()->str:
|
|
query = "SELECT p.id as id, p.name as name, p.street as street, p.postal_code as postal_code, p.city as city, p.type as type, p.flags as flags, p.created as created, p.modified as modified, p.deleted as deleted FROM participant p INNER JOIN user u WHERE u.participant_id = p.id and u.id = ?userid?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_participants()->str:
|
|
query = "SELECT id, name, street, postal_code, city, type, flags, created, modified, deleted FROM participant p ORDER BY p.name"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcalls(options:dict={'past_days':3})->str:
|
|
# a pytest proves this method to be identical to create_sql_query_shipcall_get(options)
|
|
assert 'past_days' in list(options.keys()), f"there must be a key 'past_days' in the options, which determines, how recent the returned list of shipcalls shall be." # part of a pytest.raises
|
|
past_days = options['past_days']
|
|
|
|
query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + \
|
|
"flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + \
|
|
"tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + \
|
|
"evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + \
|
|
"FROM shipcall s " + \
|
|
"LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + \
|
|
"WHERE " + \
|
|
"(type = 1 AND " + \
|
|
f"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \
|
|
f"(eta >= DATE(NOW() - INTERVAL {past_days} DAY)))) OR " + \
|
|
"((type = 2 OR type = 3) AND " + \
|
|
f"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \
|
|
f"(etd >= DATE(NOW() - INTERVAL {past_days} DAY)))) " + \
|
|
"ORDER BY eta")
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_ships()->str:
|
|
query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_ship_by_id()->str:
|
|
query = "SELECT * FROM ship where id = ?id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_times()->str:
|
|
query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \
|
|
"zone_entry, zone_entry_fixed, operations_start, operations_end, remarks, shipcall_id, participant_id, " + \
|
|
"berth_id, berth_info, pier_side, participant_type, created, modified, ata, atd, eta_interval_end, etd_interval_end FROM times " + \
|
|
"WHERE times.shipcall_id = ?scid?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_user_by_id():
|
|
query = "SELECT * FROM user where id = ?id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_user_put(schemaModel:dict):
|
|
# a pytest proves this method to be identical to create_sql_query_user_put(schemaModel)
|
|
prefix = "UPDATE user SET "
|
|
suffix = "WHERE id = ?id?"
|
|
center = [f"{key} = ?{key}? " for key in schemaModel.keys() if key not in ["id", "old_password", "new_password"]]
|
|
|
|
query = prefix + ", ".join(center) + suffix
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_update_user_password()->str:
|
|
query = "UPDATE user SET password_hash = ?password_hash? WHERE id = ?id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_participants()->str:
|
|
query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_participant_from_id()->str:
|
|
query = "SELECT id, type, flags FROM participant WHERE id=?participant_id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_post(schemaModel:dict)->str:
|
|
# a pytest proves this method to be identical to create_sql_query_shipcall_post(schemaModel)
|
|
|
|
param_keys = {key:key for key in schemaModel.keys()}
|
|
param_keys["type"] = "type_value"
|
|
param_keys["evaluation"] = "evaluation_value"
|
|
|
|
prefix = "INSERT INTO shipcall ("
|
|
bridge = ") VALUES ("
|
|
suffix = ")"
|
|
|
|
stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]])
|
|
stage2 = ",".join([f"?{param_keys.get(key)}?" for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]])
|
|
|
|
query = prefix+stage1+bridge+stage2+suffix
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_last_insert_id()->str:
|
|
query = "select last_insert_id()"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_post_last_insert_id()->str:
|
|
"""alias function. May be deleted soon"""
|
|
query = SQLQuery.get_last_insert_id()
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_post_update_shipcall_participant_map()->str:
|
|
query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
|
|
return query
|
|
|
|
@staticmethod
|
|
def create_sql_query_history_post()->str:
|
|
query = create_sql_query_history_post()
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_by_id()->str:
|
|
query = "SELECT * FROM shipcall where id = ?id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_put(schemaModel:dict)->str:
|
|
# a pytest proves this method to be identical to create_sql_query_shipcall_put(schemaModel)
|
|
param_keys = {key:key for key in schemaModel.keys()}
|
|
param_keys["type"] = "type_value"
|
|
param_keys["evaluation"] = "evaluation_value"
|
|
|
|
prefix = "UPDATE shipcall SET "
|
|
suffix = "WHERE id = ?id?"
|
|
body = ", ".join([f"{key} = ?{param_keys.get(key)}? " for key in schemaModel.keys() if key not in ["id", "participants", "created", "modified", "evaluation", "evaluation_message", "type_value", "evaluation_value"]])
|
|
|
|
query = prefix + body + suffix
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_participant_map_by_shipcall_id()->str:
|
|
query = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_participant_map_by_shipcall_id_and_type()->str:
|
|
query = "SELECT id, participant_id FROM shipcall_participant_map where (shipcall_id = ?id? AND type=?type?)"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_participant_map_delete_by_id()->str:
|
|
query = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def create_sql_query_history_put()->str:
|
|
query = create_sql_query_history_put()
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_ship_post(schemaModel:dict)->str:
|
|
# a pytest proves this method to be identical to create_sql_query_ship_post(schemaModel)
|
|
prefix = "INSERT INTO ship ("
|
|
suffix = ")"
|
|
bridge = ") VALUES ("
|
|
|
|
stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id", "created", "modified"]])
|
|
stage2 = ",".join([f"?{key}?" for key in schemaModel.keys() if not key in ["id", "created", "modified"]])
|
|
|
|
query = prefix + stage1 + bridge + stage2 + suffix
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_ship_put(schemaModel:dict)->str:
|
|
# a pytest proves this method to be identical to create_sql_query_ship_put(schemaModel)
|
|
prefix = "UPDATE ship SET "
|
|
suffix = "WHERE id = ?id?"
|
|
body = ", ".join([f"{key} = ?{key}? " for key in schemaModel.keys() if not key in ["id","created","modified"]])
|
|
|
|
query = prefix + body + suffix
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_ship_delete_by_id()->str:
|
|
query = "UPDATE ship SET deleted = 1 WHERE id = ?id?"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_notification_post()->str:
|
|
raise NotImplementedError()
|
|
# #TODO: this query is wrong and just a proxy for a POST request
|
|
query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)"
|
|
return query
|
|
|
|
@staticmethod
|
|
def get_shipcall_put_notification_state()->str:
|
|
raise NotImplementedError()
|
|
# #TODO: use evaluation_notifications_sent here and consider only the shipcall_id
|
|
# #TODO: query
|
|
query = ...
|
|
return query
|
|
|
|
|