import logging import pydapper import smtplib import json import os from email.message import EmailMessage from BreCal.schemas import model, defs from BreCal.local_db import getPoolConnection from BreCal.database.update_database import evaluate_shipcall_state from BreCal.database.sql_queries import create_sql_query_shipcall_get from BreCal.database.sql_queries import SQLQuery from BreCal.database.sql_utils import get_notification_for_shipcall_and_type from BreCal.services.email_handling import EmailHandler import threading import schedule import time options = {'past_days':7} def UpdateShipcalls(options:dict = {'past_days':2}): """ performs a single update step of every shipcall that matches the criteria. 1.) opens an SQL connection 2.) queries every shipcall that fulfills the {options} criteria 3.) iterates over each shipcall 4.) closes the SQL connection options: key: 'past_days'. Is used to execute a filtered query of all available shipcalls. Defaults to 2 (days) """ pooledConnection = None try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) # obtain data from the MYSQL database (uses 'options' to filter the resulting data by the ETA, considering those entries of 'past_days'-range) query = create_sql_query_shipcall_get(options) data = commands.query(query, model=model.Shipcall) data = [s for s in data if not s.canceled] # filter out canceled shipcalls # get the shipcall ids, which are of interest shipcall_ids = [dat.id for dat in data] # iterate over each shipcall id and (re-) evaluate it for shipcall_id in shipcall_ids: # apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=shipcall_id) # new_id (last insert id) refers to the shipcall id except Exception as ex: logging.error(ex) finally: if pooledConnection is not None: pooledConnection.close() return def UpdateNotifications(cooldown_in_mins:int=10): """ This function evaluates all notifications in state ("level") 0 which have been recently created. If a specified amount of time has passed the notification is updated to state 1 and a notification is received by the user """ pooledConnection = None try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) query = f"SELECT * FROM notification WHERE level = 0 AND created < TIMESTAMP(NOW() - INTERVAL {cooldown_in_mins} MINUTE)" data = commands.query(query, model=model.Notification) for notification in data: commands.execute("UPDATE notification SET level = 1 WHERE id = ?id?", param={"id":notification.id}) except Exception as ex: logging.error(ex) finally: if pooledConnection is not None: pooledConnection.close() def ClearNotifications(max_age_in_days:int=3): """ This function clears all notifications in state ("level") 2 that are older than x days """ pooledConnection = None try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) query = f"DELETE FROM notification WHERE level = 2 and created < TIMESTAMP(NOW() - INTERVAL {max_age_in_days} DAY)" result = commands.execute(query) if(result > 0): logging.info(f"Deleted {result} notifications") except Exception as ex: logging.error(ex) finally: if pooledConnection is not None: pooledConnection.close() def SendEmails(email_dict): """ This function sends emails to all users in the emaildict """ pooledConnection = None conn = None try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) conn = smtplib.SMTP(defs.email_credentials["server"], defs.email_credentials["port"]) conn.set_debuglevel(1) # set this to 0 to disable debug output to log conn.ehlo() conn.starttls() conn.ehlo() conn.login(defs.email_credentials["sender"], defs.email_credentials["password_send"]) current_path = os.path.dirname(os.path.abspath(__file__)) if not defs.message_types: f = open(os.path.join(current_path,"../msg/msg_types.json"), encoding='utf-8'); defs.message_types = json.load(f) f.close() for user, notifications in email_dict.items(): msg = EmailMessage() msg["Subject"] = '[Bremen calling] Notification' msg["From"] = defs.email_credentials["sender"] msg["To"] = user.user_email with open(os.path.join(current_path,'../msg/notification_template.html'), mode="r", encoding="utf-8") as file: body = file.read() replacement = "" for notification in notifications: message_type = next((x for x in defs.message_types if x["type"] == notification.type), None) if message_type is None: logging.error(f"Message type {notification.type} not found") continue with open(os.path.join(current_path,'../msg/notification_element.html'), mode="r", encoding="utf-8") as file: element = file.read() element = element.replace("[[color]]", message_type["color"]) element = element.replace("[[link]]", message_type["link"]) # We want to show the following information for each notification: # Ship-name, Arr/Dep/Shift, ETA/ETD, berth sentinel = object() shipcall = commands.query_single_or_default("SELECT * FROM shipcall WHERE id = ?id?", sentinel, model=model.Shipcall, param={"id":notification.shipcall_id}) if shipcall is sentinel: logging.error(f"Shipcall with id {notification.shipcall_id} not found") continue shipcall_type = defs.shipcall_types[shipcall.type] eta_text = shipcall.eta.strftime("%d.%m.%Y %H:%M") if shipcall.type == 1 else shipcall.etd.strftime("%d.%m.%Y %H:%M") ship = commands.query_single_or_default("SELECT * FROM ship WHERE id = ?id?", sentinel, model=model.Ship, param={"id":shipcall.ship_id}) if ship is sentinel: logging.error(f"Ship with id {shipcall.ship_id} not found") continue berth_id = shipcall.arrival_berth_id if shipcall.type == 1 else shipcall.departure_berth_id berth = commands.query_single_or_default("SELECT * FROM berth WHERE id = ?id?", sentinel, model=model.Berth, param={"id":berth_id}) berth_text = "" if berth is not sentinel: berth_text = berth.name times = commands.query_single_or_default("SELECT * FROM times WHERE shipcall_id = ?id? and participant_type = 8", sentinel, model=model.Times, param={"id":notification.shipcall_id}) if times is not sentinel: eta_text = times.eta_berth.strftime("%d.%m.%Y %H:%M") if shipcall.type == 1 else times.etd_berth.strftime("%d.%m.%Y %H:%M") text = f"{ship.name} ({shipcall_type}) - {eta_text} - {berth_text}" element = element.replace("[[text]]", text) element = element.replace("[[notification_text]]", message_type["msg_text"]) replacement += element body = body.replace("[[NOTIFICATION_ELEMENTS]]", replacement) msg.set_content(body, subtype='html', charset='utf-8', cte='8bit') conn.sendmail(defs.email_credentials["sender"], user.user_email, msg.as_string()) except Exception as ex: logging.error(ex) finally: if conn is not None: conn.quit() if pooledConnection is not None: pooledConnection.close() def SendNotifications(): # perhaps this will be moved somewhere else later pooledConnection = None try: # find all notifications in level 1 pooledConnection = getPoolConnection() query = "SELECT * from notification WHERE level = 1" commands = pydapper.using(pooledConnection) data = commands.query(query, model=model.Notification) if len(data) == 0: return # cache participants and users for performance beforehand query = "SELECT * from participant"; participants = commands.query(query, model=model.Participant) email_dict = dict() users_dict = dict() user_query = "SELECT * from user" users = commands.query(user_query, model=model.User) for participant in participants: for user in users: if user.participant_id == participant.id: if not participant.id in users_dict: users_dict[participant.id] = [] users_dict[participant.id].append(user) # break for notification in data: if not notification.participant_id: # no participant defined, this update goes to all participants of this shipcall p_query = "SELECT * from shipcall_participant_map where shipcall_id = ?id?" assigned_participants = commands.query(p_query, model=model.ShipcallParticipantMap, param={"id":notification.shipcall_id}) for assigned_participant in assigned_participants: users = users_dict[assigned_participant.participant_id] for user in users: # send notification to user if user.notify_email: if user not in email_dict: email_dict[user] = [] email_dict[user].append(notification) if user.notify_whatsapp: # TBD pass if user.notify_signal: # TBD pass else: users = users_dict[notification.participant_id] for user in users: # send notification to user if user.notify_email and user.wants_notifications(notification.type): if user not in email_dict: email_dict[user] = [] email_dict[user].append(notification) if user.notify_whatsapp and user.wants_notifications(notification.type): # TBD pass if user.notify_signal and user.wants_notifications(notification.type): # TBD pass # mark as sent commands.execute("UPDATE notification SET level = 2 WHERE id = ?id?", param={"id":notification.id}) # send emails (if any) if len(email_dict) > 0: SendEmails(email_dict) except Exception as ex: logging.error(ex) finally: if pooledConnection is not None: pooledConnection.close() def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:dict={'past_days':2}): kwargs_ = {"options":options} schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_) return def add_function_to_evaluate_notifications(interval_in_minutes:int=1): schedule.every(1).minutes.do(UpdateNotifications, interval_in_minutes) return def add_function_to_clear_notifications(interval_in_days:int=3): schedule.every(30).minutes.do(ClearNotifications, interval_in_days) return def add_function_to_schedule_send_notifications(interval_in_minutes:int=1): schedule.every(interval_in_minutes).minutes.do(SendNotifications) return def eval_next_24_hrs(): pooledConnection = None try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) query = SQLQuery.get_next24hrs_shipcalls() data = commands.query(query) nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 2, ?message?)" for shipcall in data: existing_notifications = get_notification_for_shipcall_and_type(shipcall["id"], 2) query = SQLQuery.get_shipcall_participant_map_by_shipcall_id() participants = commands.query(query, model=dict, param={"id":shipcall["id"]}) for participant in participants: if participant["type"] == 1: # BSMD continue # if participant["type"] == 32: # PORT AUTHORITY # Christin: Brake möchte sie vielleicht doch haben # continue # check if "open" notification already exists found_notification = False for existing_notification in existing_notifications: if existing_notification["participant_id"] == participant["id"] and existing_notification["level"] == 0: found_notification = True break if not found_notification: commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":shipcall["name"]}) except Exception as ex: logging.error(ex) finally: if pooledConnection is not None: pooledConnection.close() return def setup_schedule(update_shipcalls_interval_in_minutes:int=60): logging.getLogger('schedule').setLevel(logging.INFO); # set the logging level of the schedule module to INFO schedule.clear() # clear all routine jobs. This prevents jobs from being created multiple times # update the evaluation state in every recent shipcall add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes) add_function_to_evaluate_notifications(defs.NOTIFICATION_COOLDOWN_MINS) add_function_to_clear_notifications(defs.NOTIFICATION_MAX_AGE_DAYS) schedule.every().day.at("09:00").do(eval_next_24_hrs) add_function_to_schedule_send_notifications(1) return def run_schedule_permanently(latency:int=30): """permanently runs the scheduler. Any time a routine task is 'due', it will be executed. There is an interval between each iteration to reduce workload""" while True: schedule.run_pending() time.sleep(latency) return def run_schedule_permanently_in_background(latency:int=30, thread_name:str="run_schedule_in_background"): """create a (daemon) thread in the background, which permanently executes the scheduled functions""" thread_names = [thread_.name for thread_ in threading.enumerate()] if not thread_name in thread_names: bg_thread = threading.Thread(target=run_schedule_permanently, name=thread_name, kwargs={"latency":latency}) bg_thread.daemon = True # 'daemon': stops, when Python's main thread stops. bg_thread.start() return