git_brcal/src/server/BreCal/services/schedule_routines.py

89 lines
4.0 KiB
Python

import logging
import pydapper
from BreCal.schemas import model
from BreCal.local_db import getPoolConnection
from BreCal.database.update_database import evaluate_shipcall_state
import threading
import schedule
import time
options = {'past_days':7}
def UpdateShipcalls(options:dict = {'past_days':2}):
"""
performs a single update step of every shipcall that matches the criteria.
1.) opens an SQL connection
2.) queries every shipcall that fulfills the {options} criteria
3.) iterates over each shipcall
4.) closes the SQL connection
options:
key: 'past_days'. Is used to execute a filtered query of all available shipcalls. Defaults to 2 (days)
"""
try:
pooledConnection = getPoolConnection()
commands = pydapper.using(pooledConnection)
query = ("SELECT s.id as id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, "
"flags, s.pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, "
"anchored, moored_lock, canceled, evaluation, evaluation_message, evaluation_notifications_sent, evaluation_time, s.created as created, s.modified as modified, time_ref_point FROM shipcall s " +
"LEFT JOIN times t ON t.shipcall_id = s.id AND t.participant_type = 8 "
"WHERE "
"(type = 1 AND (COALESCE(t.eta_berth, eta) >= DATE(NOW() - INTERVAL %d DAY))) OR "
"((type = 2 OR type = 3) AND (COALESCE(t.etd_berth, etd) >= DATE(NOW() - INTERVAL %d DAY)))"
"ORDER BY s.id") % (options["past_days"], options["past_days"])
# obtain data from the MYSQL database
data = commands.query(query, model=model.Shipcall)
# get the shipcall ids, which are of interest
shipcall_ids = [dat.id for dat in data]
# iterate over each shipcall id and (re-) evaluate it
for shipcall_id in shipcall_ids:
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=shipcall_id) # new_id (last insert id) refers to the shipcall id
pooledConnection.close()
except Exception as ex:
logging.error(ex)
return
def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:dict={'past_days':2}):
kwargs_ = {"options":options}
schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_)
return
def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
logging.getLogger('schedule').setLevel(logging.INFO); # set the logging level of the schedule module to INFO
schedule.clear() # clear all routine jobs. This prevents jobs from being created multiple times
# update the evaluation state in every recent shipcall
add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes)
# placeholder: create/send notifications
# add_function_to_schedule__send_notifications(...)
return
def run_schedule_permanently(latency:int=30):
"""permanently runs the scheduler. Any time a routine task is 'due', it will be executed. There is an interval between each iteration to reduce workload"""
while True:
schedule.run_pending()
time.sleep(latency)
return
def run_schedule_permanently_in_background(latency:int=30, thread_name:str="run_schedule_in_background"):
"""create a (daemon) thread in the background, which permanently executes the scheduled functions"""
thread_names = [thread_.name for thread_ in threading.enumerate()]
if not thread_name in thread_names:
bg_thread = threading.Thread(target=run_schedule_permanently, name=thread_name, kwargs={"latency":latency})
bg_thread.daemon = True # 'daemon': stops, when Python's main thread stops.
bg_thread.start()
return