import logging def create_sql_query_shipcall_get(options:dict)->str: """ creates an SQL query, which selects all shipcalls from the mysql database. the agency eta times are used to order the entries. args: options : dict. A dictionary, which must contains the 'past_days' key (int). Determines the range by which shipcalls are filtered. """ if "participant_id" not in options: # if no participant_id is given, all shipcalls are selected query = ("SELECT s.id as id, ship_id, port_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + "flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + "tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + "evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + "FROM shipcall s " + "LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + "WHERE " + "(type = 1 AND " + "((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL %d DAY)) OR " + "(eta >= DATE(NOW() - INTERVAL %d DAY)))) OR " + "((type = 2 OR type = 3) AND " + "((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL %d DAY)) OR " + "(etd >= DATE(NOW() - INTERVAL %d DAY)))) " + "ORDER BY eta") % (options["past_days"], options["past_days"], options["past_days"], options["past_days"]) else: query = ("SELECT s.id as id, ship_id, port_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + "flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + "tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + "evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + "FROM shipcall s " + "LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + "WHERE " + "port_id in (SELECT port_id FROM participant_port_map WHERE participant_id = %d)" + " AND (" + "(type = 1 AND " + "((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL %d DAY)) OR " + "(eta >= DATE(NOW() - INTERVAL %d DAY)))) OR " + "((type = 2 OR type = 3) AND " + "((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL %d DAY)) OR " + "(etd >= DATE(NOW() - INTERVAL %d DAY))))) " + "ORDER BY eta") % (options["participant_id"], options["past_days"], options["past_days"], options["past_days"], options["past_days"]) return query def create_sql_query_shipcall_post(schemaModel:dict)->str: query = "INSERT INTO shipcall (" isNotFirst = False for key in schemaModel.keys(): if key == "id": continue if key == "participants": continue if key == "created": continue if key == "modified": continue if key == "evaluation": continue if key == "evaluation_message": continue if key == "type_value": continue if key == "evaluation_value": continue if isNotFirst: query += "," isNotFirst = True query += key query += ") VALUES (" isNotFirst = False for key in schemaModel.keys(): param_key = key if key == "id": continue if key == "participants": continue if key == "created": continue if key == "modified": continue if key == "evaluation": continue if key == "evaluation_message": continue if key == "type": param_key = "type_value" if key == "type_value": continue if key == "evaluation": param_key = "evaluation_value" if key == "evaluation_value": continue if isNotFirst: query += "," isNotFirst = True query += "?" + param_key + "?" query += ")" return query def create_sql_query_shipcall_put(schemaModel:dict)->str: query = "UPDATE shipcall SET " isNotFirst = False for key in schemaModel.keys(): param_key = key if key == "id": continue if key == "participants": continue if key == "created": continue if key == "modified": continue if key == "evaluation": continue if key == "evaluation_message": continue if key == "type": param_key = "type_value" if key == "type_value": continue if key == "evaluation": param_key = "evaluation_value" if key == "evaluation_value": continue if isNotFirst: query += ", " isNotFirst = True query += key + " = ?" + param_key + "? " query += "WHERE id = ?id?" return query def create_sql_query_history_post()->str: query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 1)" return query def create_sql_query_history_put()->str: query = "INSERT INTO history (participant_id, shipcall_id, user_id, timestamp, eta, type, operation) VALUES (?pid?, ?scid?, ?uid?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 1, 2)" return query def create_sql_query_user_put(schemaModel:dict): query = "UPDATE user SET " isNotFirst = False for key in schemaModel.keys(): if key == "id": continue if key == "old_password": continue if key == "new_password": continue if isNotFirst: query += ", " isNotFirst = True query += key + " = ?" + key + "? " query += "WHERE id = ?id?" return query def create_sql_query_ship_post(schemaModel:dict): query = "INSERT INTO ship (" isNotFirst = False for key in schemaModel.keys(): if key == "id": continue if key == "created": continue if key == "modified": continue if isNotFirst: query += "," isNotFirst = True query += key query += ") VALUES (" isNotFirst = False for key in schemaModel.keys(): if key == "id": continue if key == "created": continue if key == "modified": continue if isNotFirst: query += "," isNotFirst = True query += "?" + key + "?" query += ")" return query def create_sql_query_ship_put(schemaModel:dict): query = "UPDATE ship SET " isNotFirst = False for key in schemaModel.keys(): if key == "id": continue if key == "created": continue if key == "modified": continue if isNotFirst: query += ", " isNotFirst = True query += key + " = ?" + key + "? " query += "WHERE id = ?id?" return query class SQLQuery(): """ This class provides quick access to different SQL query functions, which creates default queries for the BreCal package. Each method is callable without initializing the SQLQuery object. Example: SQLQuery.get_berths() When the data violates one of the rules, a marshmallow.ValidationError is raised, which details the issues. """ def __init__(self) -> None: pass @staticmethod def get_berth()->str: query = "SELECT id, name, `lock`, owner_id, authority_id, created, modified, deleted FROM berth WHERE deleted = 0 ORDER BY name" return query @staticmethod def get_history()->str: query = "SELECT id, participant_id, shipcall_id, timestamp, eta, type, operation FROM history WHERE shipcall_id = ?shipcallid?" return query @staticmethod def get_user()->str: query = "SELECT id, participant_id, first_name, last_name, user_name, user_email, user_phone, password_hash, " +\ "api_key, notify_email, notify_whatsapp, notify_signal, notify_popup, notify_event, created, modified FROM user " +\ "WHERE user_name = ?username? OR user_email = ?username?" return query @staticmethod def get_notifications()->str: query = "SELECT id, shipcall_id, level, type, message, created, modified FROM notification " + \ "WHERE shipcall_id = ?scid?" return query @staticmethod def get_participant_by_user_id()->str: query = "SELECT p.id as id, p.name as name, p.street as street, p.postal_code as postal_code, p.city as city, p.type as type, p.flags as flags, p.created as created, p.modified as modified, p.deleted as deleted FROM participant p INNER JOIN user u WHERE u.participant_id = p.id and u.id = ?userid?" return query @staticmethod def get_participants()->str: query = "SELECT id, name, street, postal_code, city, type, flags, created, modified, deleted FROM participant p ORDER BY p.name" return query @staticmethod def get_shipcalls(options:dict={'past_days':3})->str: # a pytest proves this method to be identical to create_sql_query_shipcall_get(options) assert 'past_days' in list(options.keys()), f"there must be a key 'past_days' in the options, which determines, how recent the returned list of shipcalls shall be." # part of a pytest.raises past_days = options['past_days'] query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + \ "flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, " + \ "tidal_window_to, rain_sensitive_cargo, recommended_tugs, anchored, moored_lock, canceled, evaluation, " + \ "evaluation_message, evaluation_time, evaluation_notifications_sent, s.created as created, s.modified as modified, time_ref_point " + \ "FROM shipcall s " + \ "LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 " + \ "WHERE " + \ "(type = 1 AND " + \ f"((t.id IS NOT NULL AND t.eta_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \ f"(eta >= DATE(NOW() - INTERVAL {past_days} DAY)))) OR " + \ "((type = 2 OR type = 3) AND " + \ f"((t.id IS NOT NULL AND t.etd_berth >= DATE(NOW() - INTERVAL {past_days} DAY)) OR " + \ f"(etd >= DATE(NOW() - INTERVAL {past_days} DAY)))) " + \ "ORDER BY eta") return query def get_next24hrs_shipcalls()->str: query = ("SELECT s.id as id, ship.name as name FROM shipcall s INNER JOIN ship ON s.ship_id = ship.id LEFT JOIN times t on t.shipcall_id = s.id AND t.participant_type = 8 " + \ "WHERE (type = 1 AND ((t.id IS NOT NULL AND t.eta_berth >= NOW() AND t.eta_berth < (NOW() + INTERVAL 1 DAY))" + \ "OR (eta >= NOW() AND eta < (NOW() + INTERVAL 1 DAY)))) OR " + \ "((type = 2 OR type = 3) AND ((t.id IS NOT NULL AND t.etd_berth >= NOW() AND " + \ "t.etd_berth < (NOW() + INTERVAL 1 DAY)) OR (etd >= NOW() AND etd < (NOW() + INTERVAL 1 DAY))))" + \ "AND s.canceled = 0") return query @staticmethod def get_ships()->str: query = "SELECT id, name, imo, callsign, participant_id, length, width, is_tug, bollard_pull, eni, created, modified, deleted FROM ship ORDER BY name" return query @staticmethod def get_ship_by_id()->str: query = "SELECT * FROM ship where id = ?id?" return query @staticmethod def get_times()->str: query = "SELECT id, eta_berth, eta_berth_fixed, etd_berth, etd_berth_fixed, lock_time, lock_time_fixed, " + \ "zone_entry, zone_entry_fixed, operations_start, operations_end, remarks, shipcall_id, participant_id, " + \ "berth_id, berth_info, pier_side, participant_type, created, modified, ata, atd, eta_interval_end, etd_interval_end FROM times " + \ "WHERE times.shipcall_id = ?scid?" return query @staticmethod def get_user_by_id(): query = "SELECT * FROM user where id = ?id?" return query @staticmethod def get_user_put(schemaModel:dict): # a pytest proves this method to be identical to create_sql_query_user_put(schemaModel) prefix = "UPDATE user SET " suffix = "WHERE id = ?id?" center = [f"{key} = ?{key}? " for key in schemaModel.keys() if key not in ["id", "old_password", "new_password"]] query = prefix + ", ".join(center) + suffix return query @staticmethod def get_update_user_password()->str: query = "UPDATE user SET password_hash = ?password_hash? WHERE id = ?id?" return query @staticmethod def get_participants()->str: query = "SELECT participant_id, type FROM shipcall_participant_map WHERE shipcall_id=?shipcall_id?" return query @staticmethod def get_participant_from_id()->str: query = "SELECT id, type, flags FROM participant WHERE id=?participant_id?" return query @staticmethod def get_shipcall_post(schemaModel:dict)->str: # a pytest proves this method to be identical to create_sql_query_shipcall_post(schemaModel) param_keys = {key:key for key in schemaModel.keys()} param_keys["type"] = "type_value" param_keys["evaluation"] = "evaluation_value" prefix = "INSERT INTO shipcall (" bridge = ") VALUES (" suffix = ")" stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]]) stage2 = ",".join([f"?{param_keys.get(key)}?" for key in schemaModel.keys() if not key in ["id","participants","created","modified","evaluation","evaluation_message","type_value","evaluation_value"]]) query = prefix+stage1+bridge+stage2+suffix return query @staticmethod def get_last_insert_id()->str: query = "select last_insert_id()" return query @staticmethod def get_shipcall_post_last_insert_id()->str: """alias function. May be deleted soon""" query = SQLQuery.get_last_insert_id() return query @staticmethod def get_shipcall_post_update_shipcall_participant_map()->str: query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" return query @staticmethod def create_sql_query_history_post()->str: query = create_sql_query_history_post() return query @staticmethod def get_shipcall_by_id()->str: query = "SELECT * FROM shipcall where id = ?id?" return query @staticmethod def get_shipcall_put(schemaModel:dict)->str: # a pytest proves this method to be identical to create_sql_query_shipcall_put(schemaModel) param_keys = {key:key for key in schemaModel.keys()} param_keys["type"] = "type_value" param_keys["evaluation"] = "evaluation_value" prefix = "UPDATE shipcall SET " suffix = "WHERE id = ?id?" body = ", ".join([f"{key} = ?{param_keys.get(key)}? " for key in schemaModel.keys() if key not in ["id", "participants", "created", "modified", "evaluation", "evaluation_message", "type_value", "evaluation_value"]]) query = prefix + body + suffix return query @staticmethod def get_shipcall_participant_map_by_shipcall_id()->str: query = "SELECT id, participant_id, type FROM shipcall_participant_map where shipcall_id = ?id?" return query @staticmethod def get_shipcall_participant_map_by_shipcall_id_and_type()->str: query = "SELECT id, participant_id FROM shipcall_participant_map where (shipcall_id = ?id? AND type=?type?)" return query @staticmethod def get_shipcall_participant_map_delete_by_id()->str: query = "DELETE FROM shipcall_participant_map WHERE id = ?existing_id?" return query @staticmethod def create_sql_query_history_put()->str: query = create_sql_query_history_put() return query @staticmethod def get_ship_post(schemaModel:dict)->str: # a pytest proves this method to be identical to create_sql_query_ship_post(schemaModel) prefix = "INSERT INTO ship (" suffix = ")" bridge = ") VALUES (" stage1 = ",".join([key for key in schemaModel.keys() if not key in ["id", "created", "modified"]]) stage2 = ",".join([f"?{key}?" for key in schemaModel.keys() if not key in ["id", "created", "modified"]]) query = prefix + stage1 + bridge + stage2 + suffix return query @staticmethod def get_ship_put(schemaModel:dict)->str: # a pytest proves this method to be identical to create_sql_query_ship_put(schemaModel) prefix = "UPDATE ship SET " suffix = "WHERE id = ?id?" body = ", ".join([f"{key} = ?{key}? " for key in schemaModel.keys() if not key in ["id","created","modified"]]) query = prefix + body + suffix return query @staticmethod def get_ship_delete_by_id()->str: query = "UPDATE ship SET deleted = 1 WHERE id = ?id?" return query @staticmethod def get_notification_post()->str: raise NotImplementedError() # #TODO: this query is wrong and just a proxy for a POST request query = "INSERT INTO shipcall_participant_map (shipcall_id, participant_id, type) VALUES (?shipcall_id?, ?participant_id?, ?type?)" return query @staticmethod def get_shipcall_put_notification_state()->str: raise NotImplementedError() # #TODO: use evaluation_notifications_sent here and consider only the shipcall_id # #TODO: query query = ... return query