350 lines
15 KiB
Python
350 lines
15 KiB
Python
import logging
|
|
import pydapper
|
|
import smtplib
|
|
import json
|
|
import os
|
|
from email.message import EmailMessage
|
|
|
|
from BreCal.schemas import model, defs
|
|
from BreCal.local_db import getPoolConnection
|
|
from BreCal.database.update_database import evaluate_shipcall_state
|
|
from BreCal.database.sql_queries import create_sql_query_shipcall_get
|
|
from BreCal.database.sql_queries import SQLQuery
|
|
from BreCal.database.sql_utils import get_notification_for_shipcall_and_type
|
|
from BreCal.services.email_handling import EmailHandler
|
|
|
|
import threading
|
|
import schedule
|
|
import time
|
|
|
|
options = {'past_days':7}
|
|
|
|
def UpdateShipcalls(options:dict = {'past_days':2}):
|
|
"""
|
|
performs a single update step of every shipcall that matches the criteria.
|
|
|
|
1.) opens an SQL connection
|
|
2.) queries every shipcall that fulfills the {options} criteria
|
|
3.) iterates over each shipcall
|
|
4.) closes the SQL connection
|
|
|
|
options:
|
|
key: 'past_days'. Is used to execute a filtered query of all available shipcalls. Defaults to 2 (days)
|
|
"""
|
|
try:
|
|
pooledConnection = getPoolConnection()
|
|
commands = pydapper.using(pooledConnection)
|
|
|
|
# obtain data from the MYSQL database (uses 'options' to filter the resulting data by the ETA, considering those entries of 'past_days'-range)
|
|
query = create_sql_query_shipcall_get(options)
|
|
data = commands.query(query, model=model.Shipcall)
|
|
|
|
data = [s for s in data if not s.canceled] # filter out canceled shipcalls
|
|
|
|
# get the shipcall ids, which are of interest
|
|
shipcall_ids = [dat.id for dat in data]
|
|
|
|
# iterate over each shipcall id and (re-) evaluate it
|
|
for shipcall_id in shipcall_ids:
|
|
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database
|
|
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=shipcall_id) # new_id (last insert id) refers to the shipcall id
|
|
|
|
pooledConnection.close()
|
|
|
|
except Exception as ex:
|
|
logging.error(ex)
|
|
return
|
|
|
|
def UpdateNotifications(cooldown_in_mins:int=10):
|
|
"""
|
|
This function evaluates all notifications in state ("level") 0 which have been recently created. If a specified amount of time has passed the
|
|
notification is updated to state 1 and a notification is received by the user
|
|
"""
|
|
|
|
try:
|
|
pooledConnection = getPoolConnection()
|
|
commands = pydapper.using(pooledConnection)
|
|
|
|
query = f"SELECT * FROM notification WHERE level = 0 AND created < TIMESTAMP(NOW() - INTERVAL {cooldown_in_mins} MINUTE)"
|
|
data = commands.query(query, model=model.Notification)
|
|
for notification in data:
|
|
commands.execute("UPDATE notification SET level = 1 WHERE id = ?id?", param={"id":notification.id})
|
|
|
|
pooledConnection.close()
|
|
except Exception as ex:
|
|
logging.error(ex)
|
|
|
|
def ClearNotifications(max_age_in_days:int=3):
|
|
"""
|
|
This function clears all notifications in state ("level") 2 that are older than x days
|
|
"""
|
|
|
|
try:
|
|
pooledConnection = getPoolConnection()
|
|
commands = pydapper.using(pooledConnection)
|
|
|
|
query = f"DELETE FROM notification WHERE level = 2 and created < TIMESTAMP(NOW() - INTERVAL {max_age_in_days} DAY)"
|
|
result = commands.execute(query)
|
|
pooledConnection.close()
|
|
if(result > 0):
|
|
logging.info(f"Deleted {result} notifications")
|
|
|
|
except Exception as ex:
|
|
logging.error(ex)
|
|
|
|
def SendEmails(email_dict):
|
|
"""
|
|
This function sends emails to all users in the emaildict
|
|
"""
|
|
try:
|
|
pooledConnection = getPoolConnection()
|
|
commands = pydapper.using(pooledConnection)
|
|
|
|
conn = smtplib.SMTP(defs.email_credentials["server"], defs.email_credentials["port"])
|
|
conn.set_debuglevel(1) # set this to 0 to disable debug output to log
|
|
conn.ehlo()
|
|
conn.starttls()
|
|
conn.ehlo()
|
|
conn.login(defs.email_credentials["sender"], defs.email_credentials["password_send"])
|
|
|
|
current_path = os.path.dirname(os.path.abspath(__file__))
|
|
|
|
if not defs.message_types:
|
|
f = open(os.path.join(current_path,"../msg/msg_types.json"), encoding='utf-8');
|
|
defs.message_types = json.load(f)
|
|
f.close()
|
|
|
|
for user_email, notifications in email_dict.items():
|
|
msg = EmailMessage()
|
|
msg["Subject"] = '[Bremen calling] Notification'
|
|
msg["From"] = defs.email_credentials["sender"]
|
|
msg["To"] = user_email
|
|
|
|
with open(os.path.join(current_path,'../msg/notification_template.html'), mode="r", encoding="utf-8") as file:
|
|
body = file.read()
|
|
|
|
replacement = ""
|
|
|
|
for notification in notifications:
|
|
|
|
message_type = next((x for x in defs.message_types if x["type"] == notification.type), None)
|
|
if message_type is None:
|
|
logging.error(f"Message type {notification.type} not found")
|
|
continue
|
|
|
|
with open(os.path.join(current_path,'../msg/notification_element.html'), mode="r", encoding="utf-8") as file:
|
|
element = file.read()
|
|
element = element.replace("[[color]]", message_type["color"])
|
|
element = element.replace("[[link]]", message_type["link"])
|
|
|
|
# We want to show the following information for each notification:
|
|
# Ship-name, Arr/Dep/Shift, ETA/ETD, berth
|
|
sentinel = object()
|
|
shipcall = commands.query_single_or_default("SELECT * FROM shipcall WHERE id = ?id?", sentinel, model=model.Shipcall, param={"id":notification.shipcall_id})
|
|
if shipcall is sentinel:
|
|
logging.error(f"Shipcall with id {notification.shipcall_id} not found")
|
|
continue
|
|
shipcall_type = defs.shipcall_types[shipcall.type]
|
|
eta_text = shipcall.eta.strftime("%d.%m.%Y %H:%M") if shipcall.type == 1 else shipcall.etd.strftime("%d.%m.%Y %H:%M")
|
|
|
|
ship = commands.query_single_or_default("SELECT * FROM ship WHERE id = ?id?", sentinel, model=model.Ship, param={"id":shipcall.ship_id})
|
|
if ship is sentinel:
|
|
logging.error(f"Ship with id {shipcall.ship_id} not found")
|
|
continue
|
|
berth_id = shipcall.arrival_berth_id if shipcall.type == 1 else shipcall.departure_berth_id
|
|
berth = commands.query_single_or_default("SELECT * FROM berth WHERE id = ?id?", sentinel, model=model.Berth, param={"id":berth_id})
|
|
berth_text = ""
|
|
if berth is not sentinel:
|
|
berth_text = berth.name
|
|
times = commands.query_single_or_default("SELECT * FROM times WHERE shipcall_id = ?id? and participant_type = 8", sentinel, model=model.Times, param={"id":notification.shipcall_id})
|
|
if times is not sentinel:
|
|
eta_text = times.eta_berth.strftime("%d.%m.%Y %H:%M") if shipcall.type == 1 else times.etd_berth.strftime("%d.%m.%Y %H:%M")
|
|
|
|
text = f"{ship.name} ({shipcall_type}) - {eta_text} - {berth_text}"
|
|
|
|
element = element.replace("[[text]]", text)
|
|
element = element.replace("[[notification_text]]", message_type["msg_text"])
|
|
|
|
replacement += element
|
|
|
|
body = body.replace("[[NOTIFICATION_ELEMENTS]]", replacement)
|
|
msg.set_content(body, subtype='html', charset='utf-8', cte='8bit')
|
|
|
|
conn.sendmail(defs.email_credentials["sender"], user_email, msg.as_string())
|
|
|
|
except Exception as ex:
|
|
logging.error(ex)
|
|
finally:
|
|
if conn is not None:
|
|
conn.quit()
|
|
if pooledConnection is not None:
|
|
pooledConnection.close()
|
|
|
|
|
|
def SendNotifications():
|
|
# perhaps this will be moved somewhere else later
|
|
try:
|
|
# find all notifications in level 1
|
|
pooledConnection = getPoolConnection()
|
|
query = "SELECT * from notification WHERE level = 1"
|
|
commands = pydapper.using(pooledConnection)
|
|
data = commands.query(query, model=model.Notification)
|
|
if len(data) == 0:
|
|
return
|
|
|
|
# cache participants and users for performance beforehand
|
|
query = "SELECT * from participant";
|
|
participants = commands.query(query, model=model.Participant)
|
|
|
|
email_dict = dict()
|
|
users_dict = dict()
|
|
user_query = "SELECT * from user"
|
|
users = commands.query(user_query)
|
|
for participant in participants:
|
|
for user in users:
|
|
if user["participant_id"] == participant.id:
|
|
if not participant.id in users_dict:
|
|
users_dict[participant.id] = []
|
|
users_dict[participant.id].append(user)
|
|
# break
|
|
|
|
|
|
for notification in data:
|
|
if not notification.participant_id: # no participant defined, this update goes to all participants of this shipcall
|
|
p_query = "SELECT * from shipcall_participant_map where shipcall_id = ?id?"
|
|
assigned_participants = commands.query(p_query, model=model.ShipcallParticipantMap, param={"id":notification.shipcall_id})
|
|
for assigned_participant in assigned_participants:
|
|
if not assigned_participant.participant_id in users_dict:
|
|
continue
|
|
users = users_dict[assigned_participant.participant_id]
|
|
for user in users:
|
|
# send notification to user
|
|
if user["notify_email"]:
|
|
if user["user_email"] not in email_dict:
|
|
email_dict[user["user_email"]] = []
|
|
if notification not in email_dict[user["user_email"]]:
|
|
email_dict[user["user_email"]].append(notification)
|
|
if user["notify_whatsapp"]:
|
|
# TBD
|
|
pass
|
|
if user["notify_signal"]:
|
|
# TBD
|
|
pass
|
|
else:
|
|
if notification.participant_id in users_dict:
|
|
users = users_dict[notification.participant_id]
|
|
for user in users:
|
|
user_notifications = model.bitflag_to_list(user["notify_event"])
|
|
# send notification to user
|
|
if user["notify_email"] and notification.type in user_notifications:
|
|
if user["user_email"] not in email_dict:
|
|
email_dict[user["user_email"]] = []
|
|
if notification not in email_dict[user["user_email"]]:
|
|
email_dict[user["user_email"]].append(notification)
|
|
if user["notify_whatsapp"] and notification.type in user_notifications:
|
|
# TBD
|
|
pass
|
|
if user["notify_signal"] and notification.type in user_notifications:
|
|
# TBD
|
|
pass
|
|
|
|
# mark as sent
|
|
commands.execute("UPDATE notification SET level = 2 WHERE id = ?id?", param={"id":notification.id})
|
|
|
|
# send emails (if any)
|
|
if len(email_dict) > 0:
|
|
SendEmails(email_dict)
|
|
|
|
except Exception as ex:
|
|
logging.error(ex)
|
|
finally:
|
|
if pooledConnection is not None:
|
|
pooledConnection.close()
|
|
|
|
def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:dict={'past_days':2}):
|
|
kwargs_ = {"options":options}
|
|
schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_)
|
|
return
|
|
|
|
def add_function_to_evaluate_notifications(interval_in_minutes:int=1):
|
|
schedule.every(1).minutes.do(UpdateNotifications, interval_in_minutes)
|
|
return
|
|
|
|
def add_function_to_clear_notifications(interval_in_days:int=3):
|
|
schedule.every(30).minutes.do(ClearNotifications, interval_in_days)
|
|
return
|
|
|
|
def add_function_to_schedule_send_notifications(interval_in_minutes:int=1):
|
|
schedule.every(interval_in_minutes).minutes.do(SendNotifications)
|
|
return
|
|
|
|
def eval_next_24_hrs():
|
|
try:
|
|
pooledConnection = getPoolConnection()
|
|
commands = pydapper.using(pooledConnection)
|
|
query = SQLQuery.get_next24hrs_shipcalls()
|
|
data = commands.query(query)
|
|
nquery = "INSERT INTO notification (shipcall_id, participant_id, level, type, message) VALUES (?shipcall_id?, ?participant_id?, 0, 2, ?message?)"
|
|
for shipcall in data:
|
|
existing_notifications = get_notification_for_shipcall_and_type(shipcall["id"], 2)
|
|
query = SQLQuery.get_shipcall_participant_map_by_shipcall_id()
|
|
participants = commands.query(query, model=dict, param={"id":shipcall["id"]})
|
|
for participant in participants:
|
|
if participant["type"] == 1: # BSMD
|
|
continue
|
|
# if participant["type"] == 32: # PORT AUTHORITY # Christin: Brake möchte sie vielleicht doch haben
|
|
# continue
|
|
# check if "open" notification already exists
|
|
found_notification = False
|
|
for existing_notification in existing_notifications:
|
|
if existing_notification["participant_id"] == participant["id"] and existing_notification["level"] == 0:
|
|
found_notification = True
|
|
break
|
|
if not found_notification:
|
|
commands.execute(nquery, param={"shipcall_id":shipcall["id"], "participant_id": participant["participant_id"], "message":shipcall["name"]})
|
|
|
|
except Exception as ex:
|
|
logging.error(ex)
|
|
|
|
finally:
|
|
if pooledConnection is not None:
|
|
pooledConnection.close()
|
|
|
|
return
|
|
|
|
def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
|
|
|
|
logging.getLogger('schedule').setLevel(logging.INFO); # set the logging level of the schedule module to INFO
|
|
|
|
schedule.clear() # clear all routine jobs. This prevents jobs from being created multiple times
|
|
|
|
# update the evaluation state in every recent shipcall
|
|
add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes)
|
|
|
|
add_function_to_evaluate_notifications(defs.NOTIFICATION_COOLDOWN_MINS)
|
|
|
|
add_function_to_clear_notifications(defs.NOTIFICATION_MAX_AGE_DAYS)
|
|
|
|
schedule.every().day.at("09:00").do(eval_next_24_hrs)
|
|
|
|
add_function_to_schedule_send_notifications(1)
|
|
|
|
return
|
|
|
|
|
|
def run_schedule_permanently(latency:int=30):
|
|
"""permanently runs the scheduler. Any time a routine task is 'due', it will be executed. There is an interval between each iteration to reduce workload"""
|
|
while True:
|
|
schedule.run_pending()
|
|
time.sleep(latency)
|
|
return
|
|
|
|
def run_schedule_permanently_in_background(latency:int=30, thread_name:str="run_schedule_in_background"):
|
|
"""create a (daemon) thread in the background, which permanently executes the scheduled functions"""
|
|
thread_names = [thread_.name for thread_ in threading.enumerate()]
|
|
if not thread_name in thread_names:
|
|
bg_thread = threading.Thread(target=run_schedule_permanently, name=thread_name, kwargs={"latency":latency})
|
|
bg_thread.daemon = True # 'daemon': stops, when Python's main thread stops.
|
|
bg_thread.start()
|
|
return
|