git_brcal/src/server/BreCal/local_db.py

84 lines
2.6 KiB
Python

import mysql.connector
from mysql.connector import pooling
import pydapper
import logging
import json
import os
import sys
from BreCal.schemas import defs
config_path = None
_connection_pool = None
def _load_json(path):
with open(path, encoding="utf-8") as fh:
return json.load(fh)
def _build_pool_config(connection_data, pool_name, pool_size):
pool_config = dict(connection_data)
pool_config.setdefault("pool_name", pool_name)
pool_config.setdefault("pool_size", pool_size)
return pool_config
def initPool(instancePath, connection_filename="connection_data_test.json",
pool_name="brecal_test_pool", pool_size=10):
"""
Initialize the MySQL connection pool and load email credentials.
"""
global config_path, _connection_pool
try:
if config_path is None:
config_path = os.path.join(instancePath, f'../../../secure/{connection_filename}')
# config_path = 'C:\\temp\\connection_data_test.json'
print(config_path)
if not os.path.exists(config_path):
print('cannot find ' + os.path.abspath(config_path))
print("instance path", instancePath)
sys.exit(1)
connection_data = _load_json(config_path)
if _connection_pool is None:
pool_config = _build_pool_config(connection_data, pool_name, pool_size)
_connection_pool = pooling.MySQLConnectionPool(**pool_config)
conn_from_pool = _connection_pool.get_connection()
try:
commands = pydapper.using(conn_from_pool)
commands.query("SELECT id from `user` LIMIT 1")
print("DB connection successful")
finally:
conn_from_pool.close()
credentials_file = "email_credentials_test.json"
credentials_path = os.path.join(instancePath, f'../../../secure/{credentials_file}')
# credentials_path = 'C:\\temp\\email_credentials_test.json'
if not os.path.exists(credentials_path):
print('cannot find ' + os.path.abspath(credentials_path))
sys.exit(1)
defs.email_credentials = _load_json(credentials_path)
except mysql.connector.PoolError as e:
logging.error(f"Failed to create connection pool: {e}")
print(e)
except Exception as e:
logging.error("Failed to initialize DB pool: %s", e)
print(e)
def getPoolConnection():
if _connection_pool is None:
raise RuntimeError("Connection pool not initialized. Call initPool first.")
try:
return _connection_pool.get_connection()
except mysql.connector.PoolError as exc:
logging.error("Connection pool exhausted: %s", exc)
raise