From fd0efe004670a03ecb5ea60e6f992964e2a2b724 Mon Sep 17 00:00:00 2001 From: scopesorting Date: Tue, 21 Nov 2023 12:47:04 +0100 Subject: [PATCH] adding schedule routine jobs, which will be executed in a background thread within WSGI while running the Flask application. --- src/server/BreCal/__init__.py | 6 ++ .../BreCal/services/schedule_routines.py | 81 +++++++++++++++++++ src/server/flaskapp.wsgi | 5 ++ 3 files changed, 92 insertions(+) create mode 100644 src/server/BreCal/services/schedule_routines.py diff --git a/src/server/BreCal/__init__.py b/src/server/BreCal/__init__.py index c4cf995..4ee4bbf 100644 --- a/src/server/BreCal/__init__.py +++ b/src/server/BreCal/__init__.py @@ -31,6 +31,7 @@ from BreCal.stubs.times_tug import get_times_tug from BreCal.stubs.times_full import get_times_full_simple from BreCal.stubs.df_times import get_df_times +from BreCal.services.schedule_routines import setup_schedule, run_schedule_permanently_in_background def create_app(test_config=None): @@ -64,6 +65,11 @@ def create_app(test_config=None): local_db.initPool(os.path.dirname(app.instance_path)) logging.info('App started') + # Setup Routine jobs (e.g., reevaluation of shipcalls) + setup_schedule(update_shipcalls_interval_in_minutes=60) + run_schedule_permanently_in_background(latency=30) + logging.info('Routine Jobs are defined.') + return app __all__ = [ diff --git a/src/server/BreCal/services/schedule_routines.py b/src/server/BreCal/services/schedule_routines.py new file mode 100644 index 0000000..a2c33dc --- /dev/null +++ b/src/server/BreCal/services/schedule_routines.py @@ -0,0 +1,81 @@ +import logging +import pydapper +from BreCal.schemas import model +from BreCal.local_db import getPoolConnection +from BreCal.database.update_database import evaluate_shipcall_state + +import threading +import schedule +import time + +options = {'past_days':7} + +def UpdateShipcalls(options:dict = {'past_days':2}): + """ + performs a single update step of every shipcall that matches the criteria. + + 1.) opens an SQL connection + 2.) queries every shipcall that fulfills the {options} criteria + 3.) iterates over each shipcall + 4.) closes the SQL connection + + options: + key: 'past_days'. Is used to execute a filtered query of all available shipcalls. Defaults to 2 (days) + """ + try: + pooledConnection = getPoolConnection() + commands = pydapper.using(pooledConnection) + query = ("SELECT id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, " + "flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, " + "anchored, moored_lock, canceled, evaluation, evaluation_message, created, modified FROM shipcall WHERE ((type = 1 OR type = 3) AND eta >= DATE(NOW() - INTERVAL %d DAY)" + "OR (type = 2 AND etd >= DATE(NOW() - INTERVAL %d DAY))) " + "ORDER BY eta") % (options["past_days"], options["past_days"]) + + # obtain data from the MYSQL database + data = commands.query(query, model=model.Shipcall) + + # get the shipcall ids, which are of interest + shipcall_ids = [dat.id for dat in data] + + # iterate over each shipcall id and (re-) evaluate it + for shipcall_id in shipcall_ids: + # apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database + evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=shipcall_id) # new_id (last insert id) refers to the shipcall id + + pooledConnection.close() + + except Exception as ex: + logging.error(ex) + return + +def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:dict={'past_days':2}): + kwargs_ = {"options":options} + schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_) + return + +def setup_schedule(update_shipcalls_interval_in_minutes:int=60): + schedule.clear() # clear all routine jobs. This prevents jobs from being created multiple times + + # update the evaluation state in every recent shipcall + add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes) + + # placeholder: create/send notifications + # add_function_to_schedule__send_notifications(...) + return + + +def run_schedule_permanently(latency:int=30): + """permanently runs the scheduler. Any time a routine task is 'due', it will be executed. There is an interval between each iteration to reduce workload""" + while True: + schedule.run_pending() + time.sleep(latency) + return + +def run_schedule_permanently_in_background(latency:int=30, thread_name:str="run_schedule_in_background"): + """create a (daemon) thread in the background, which permanently executes the scheduled functions""" + thread_names = [thread_.name for thread_ in threading.enumerate()] + if not thread_name in thread_names: + bg_thread = threading.Thread(target=run_schedule_permanently, name=thread_name, kwargs={"latency":latency}) + bg_thread.daemon = True # 'daemon': stops, when Python's main thread stops. + bg_thread.start() + return diff --git a/src/server/flaskapp.wsgi b/src/server/flaskapp.wsgi index 1591b35..706e336 100644 --- a/src/server/flaskapp.wsgi +++ b/src/server/flaskapp.wsgi @@ -2,6 +2,9 @@ import os import sys import logging +# import the schedule library, which handles routine jobs +import schedule + sys.path.insert(0, '/var/www/brecal_devel/src/server') sys.path.insert(0, '/var/www/venv/lib/python3.10/site-packages/') @@ -11,6 +14,8 @@ os.environ['SECRET_KEY'] = 'zdiTz8P3jXOc7jztIQAoelK4zztyuCpJ' # Set up logging logging.basicConfig(stream=sys.stderr, level=logging.DEBUG) +# Set up Scheduled Jobs + # Import and run the Flask app from BreCal import create_app application = create_app() \ No newline at end of file