adding schedule routine jobs, which will be executed in a background thread within WSGI while running the Flask application.
This commit is contained in:
parent
9346920a97
commit
fd0efe0046
@ -31,6 +31,7 @@ from BreCal.stubs.times_tug import get_times_tug
|
|||||||
from BreCal.stubs.times_full import get_times_full_simple
|
from BreCal.stubs.times_full import get_times_full_simple
|
||||||
from BreCal.stubs.df_times import get_df_times
|
from BreCal.stubs.df_times import get_df_times
|
||||||
|
|
||||||
|
from BreCal.services.schedule_routines import setup_schedule, run_schedule_permanently_in_background
|
||||||
|
|
||||||
def create_app(test_config=None):
|
def create_app(test_config=None):
|
||||||
|
|
||||||
@ -64,6 +65,11 @@ def create_app(test_config=None):
|
|||||||
local_db.initPool(os.path.dirname(app.instance_path))
|
local_db.initPool(os.path.dirname(app.instance_path))
|
||||||
logging.info('App started')
|
logging.info('App started')
|
||||||
|
|
||||||
|
# Setup Routine jobs (e.g., reevaluation of shipcalls)
|
||||||
|
setup_schedule(update_shipcalls_interval_in_minutes=60)
|
||||||
|
run_schedule_permanently_in_background(latency=30)
|
||||||
|
logging.info('Routine Jobs are defined.')
|
||||||
|
|
||||||
return app
|
return app
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
|
|||||||
81
src/server/BreCal/services/schedule_routines.py
Normal file
81
src/server/BreCal/services/schedule_routines.py
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
import logging
|
||||||
|
import pydapper
|
||||||
|
from BreCal.schemas import model
|
||||||
|
from BreCal.local_db import getPoolConnection
|
||||||
|
from BreCal.database.update_database import evaluate_shipcall_state
|
||||||
|
|
||||||
|
import threading
|
||||||
|
import schedule
|
||||||
|
import time
|
||||||
|
|
||||||
|
options = {'past_days':7}
|
||||||
|
|
||||||
|
def UpdateShipcalls(options:dict = {'past_days':2}):
|
||||||
|
"""
|
||||||
|
performs a single update step of every shipcall that matches the criteria.
|
||||||
|
|
||||||
|
1.) opens an SQL connection
|
||||||
|
2.) queries every shipcall that fulfills the {options} criteria
|
||||||
|
3.) iterates over each shipcall
|
||||||
|
4.) closes the SQL connection
|
||||||
|
|
||||||
|
options:
|
||||||
|
key: 'past_days'. Is used to execute a filtered query of all available shipcalls. Defaults to 2 (days)
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
pooledConnection = getPoolConnection()
|
||||||
|
commands = pydapper.using(pooledConnection)
|
||||||
|
query = ("SELECT id, ship_id, type, eta, voyage, etd, arrival_berth_id, departure_berth_id, tug_required, pilot_required, "
|
||||||
|
"flags, pier_side, bunkering, replenishing_terminal, replenishing_lock, draft, tidal_window_from, tidal_window_to, rain_sensitive_cargo, recommended_tugs, "
|
||||||
|
"anchored, moored_lock, canceled, evaluation, evaluation_message, created, modified FROM shipcall WHERE ((type = 1 OR type = 3) AND eta >= DATE(NOW() - INTERVAL %d DAY)"
|
||||||
|
"OR (type = 2 AND etd >= DATE(NOW() - INTERVAL %d DAY))) "
|
||||||
|
"ORDER BY eta") % (options["past_days"], options["past_days"])
|
||||||
|
|
||||||
|
# obtain data from the MYSQL database
|
||||||
|
data = commands.query(query, model=model.Shipcall)
|
||||||
|
|
||||||
|
# get the shipcall ids, which are of interest
|
||||||
|
shipcall_ids = [dat.id for dat in data]
|
||||||
|
|
||||||
|
# iterate over each shipcall id and (re-) evaluate it
|
||||||
|
for shipcall_id in shipcall_ids:
|
||||||
|
# apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database
|
||||||
|
evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=shipcall_id) # new_id (last insert id) refers to the shipcall id
|
||||||
|
|
||||||
|
pooledConnection.close()
|
||||||
|
|
||||||
|
except Exception as ex:
|
||||||
|
logging.error(ex)
|
||||||
|
return
|
||||||
|
|
||||||
|
def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:dict={'past_days':2}):
|
||||||
|
kwargs_ = {"options":options}
|
||||||
|
schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_)
|
||||||
|
return
|
||||||
|
|
||||||
|
def setup_schedule(update_shipcalls_interval_in_minutes:int=60):
|
||||||
|
schedule.clear() # clear all routine jobs. This prevents jobs from being created multiple times
|
||||||
|
|
||||||
|
# update the evaluation state in every recent shipcall
|
||||||
|
add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes)
|
||||||
|
|
||||||
|
# placeholder: create/send notifications
|
||||||
|
# add_function_to_schedule__send_notifications(...)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
def run_schedule_permanently(latency:int=30):
|
||||||
|
"""permanently runs the scheduler. Any time a routine task is 'due', it will be executed. There is an interval between each iteration to reduce workload"""
|
||||||
|
while True:
|
||||||
|
schedule.run_pending()
|
||||||
|
time.sleep(latency)
|
||||||
|
return
|
||||||
|
|
||||||
|
def run_schedule_permanently_in_background(latency:int=30, thread_name:str="run_schedule_in_background"):
|
||||||
|
"""create a (daemon) thread in the background, which permanently executes the scheduled functions"""
|
||||||
|
thread_names = [thread_.name for thread_ in threading.enumerate()]
|
||||||
|
if not thread_name in thread_names:
|
||||||
|
bg_thread = threading.Thread(target=run_schedule_permanently, name=thread_name, kwargs={"latency":latency})
|
||||||
|
bg_thread.daemon = True # 'daemon': stops, when Python's main thread stops.
|
||||||
|
bg_thread.start()
|
||||||
|
return
|
||||||
@ -2,6 +2,9 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
# import the schedule library, which handles routine jobs
|
||||||
|
import schedule
|
||||||
|
|
||||||
sys.path.insert(0, '/var/www/brecal_devel/src/server')
|
sys.path.insert(0, '/var/www/brecal_devel/src/server')
|
||||||
sys.path.insert(0, '/var/www/venv/lib/python3.10/site-packages/')
|
sys.path.insert(0, '/var/www/venv/lib/python3.10/site-packages/')
|
||||||
|
|
||||||
@ -11,6 +14,8 @@ os.environ['SECRET_KEY'] = 'zdiTz8P3jXOc7jztIQAoelK4zztyuCpJ'
|
|||||||
# Set up logging
|
# Set up logging
|
||||||
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
|
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
|
||||||
|
|
||||||
|
# Set up Scheduled Jobs
|
||||||
|
|
||||||
# Import and run the Flask app
|
# Import and run the Flask app
|
||||||
from BreCal import create_app
|
from BreCal import create_app
|
||||||
application = create_app()
|
application = create_app()
|
||||||
Loading…
Reference in New Issue
Block a user