import logging import pydapper from BreCal.schemas import model from BreCal.local_db import getPoolConnection from BreCal.database.update_database import evaluate_shipcall_state from BreCal.database.sql_queries import create_sql_query_shipcall_get import threading import schedule import time options = {'past_days':7} def UpdateShipcalls(options:dict = {'past_days':2}): """ performs a single update step of every shipcall that matches the criteria. 1.) opens an SQL connection 2.) queries every shipcall that fulfills the {options} criteria 3.) iterates over each shipcall 4.) closes the SQL connection options: key: 'past_days'. Is used to execute a filtered query of all available shipcalls. Defaults to 2 (days) """ try: pooledConnection = getPoolConnection() commands = pydapper.using(pooledConnection) # obtain data from the MYSQL database (uses 'options' to filter the resulting data by the ETA, considering those entries of 'past_days'-range) query = create_sql_query_shipcall_get(options) data = commands.query(query, model=model.Shipcall) # get the shipcall ids, which are of interest shipcall_ids = [dat.id for dat in data] # iterate over each shipcall id and (re-) evaluate it for shipcall_id in shipcall_ids: # apply 'Traffic Light' evaluation to obtain 'GREEN', 'YELLOW' or 'RED' evaluation state. The function internally updates the mysql database evaluate_shipcall_state(mysql_connector_instance=pooledConnection, shipcall_id=shipcall_id) # new_id (last insert id) refers to the shipcall id pooledConnection.close() except Exception as ex: logging.error(ex) return def add_function_to_schedule__update_shipcalls(interval_in_minutes:int, options:dict={'past_days':2}): kwargs_ = {"options":options} schedule.every(interval_in_minutes).minutes.do(UpdateShipcalls, **kwargs_) return def add_function_to_schedule__send_notifications(vr, interval_in_minutes:int=10): schedule.every(interval_in_minutes).minutes.do(vr.notifier.send_notifications) return def setup_schedule(update_shipcalls_interval_in_minutes:int=60): logging.getLogger('schedule').setLevel(logging.INFO); # set the logging level of the schedule module to INFO schedule.clear() # clear all routine jobs. This prevents jobs from being created multiple times # update the evaluation state in every recent shipcall add_function_to_schedule__update_shipcalls(update_shipcalls_interval_in_minutes) # placeholder: create/send notifications # add_function_to_schedule__send_notifications(...) return def run_schedule_permanently(latency:int=30): """permanently runs the scheduler. Any time a routine task is 'due', it will be executed. There is an interval between each iteration to reduce workload""" while True: schedule.run_pending() time.sleep(latency) return def run_schedule_permanently_in_background(latency:int=30, thread_name:str="run_schedule_in_background"): """create a (daemon) thread in the background, which permanently executes the scheduled functions""" thread_names = [thread_.name for thread_ in threading.enumerate()] if not thread_name in thread_names: bg_thread = threading.Thread(target=run_schedule_permanently, name=thread_name, kwargs={"latency":latency}) bg_thread.daemon = True # 'daemon': stops, when Python's main thread stops. bg_thread.start() return