Feature/removing pandas warning (#14)

* removing the Pandas 'sqlalchemy' warning by refactoring

* removing the Pandas 'sqlalchemy' warning by refactoring.

* reformatting
This commit is contained in:
scopesorting 2023-11-07 07:08:52 +01:00 committed by GitHub
parent b75ea6891c
commit 2374cf4ff0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -26,7 +26,7 @@ class SQLHandler():
with self.sql_connection.cursor(buffered=True) as cursor: with self.sql_connection.cursor(buffered=True) as cursor:
cursor.execute("SHOW TABLES") cursor.execute("SHOW TABLES")
schema = cursor.fetchall() schema = cursor.fetchall()
all_schemas = [schem[0] for schem in schema] all_schemas = [schem[0] for schem in schema]
return all_schemas return all_schemas
def build_str_to_model_dict(self): def build_str_to_model_dict(self):
@ -42,13 +42,51 @@ class SQLHandler():
def read_mysql_table_to_df(self, table_name:str): def read_mysql_table_to_df(self, table_name:str):
"""determine a {table_name}, which will be read from a mysql server. returns a pandas DataFrame with the respective data""" """determine a {table_name}, which will be read from a mysql server. returns a pandas DataFrame with the respective data"""
df = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=self.sql_connection) with self.sql_connection.cursor(buffered=True) as cursor: #df = pd.read_sql(sql=f"SELECT * FROM {table_name}", con=self.sql_connection)
# 1.) get the column names
cursor.execute(f"DESCRIBE {table_name}")
cols = cursor.fetchall()
column_names = [col_name[0] for col_name in cols]
# 2.) get the data tuples
cursor.execute(f"SELECT * FROM {table_name}")
data = cursor.fetchall()
# 3.) map the data tuples to the correct column names
data = [{k:v for k,v in zip(column_names, dat)} for dat in data]
# 4.) build a dataframe from the respective data models (which ensures the correct data type)
data_model = self.str_to_model_dict.get(table_name)
if data_model is not None:
df = pd.DataFrame([data_model(**dat) for dat in data])
else:
df = pd.DataFrame([dat for dat in data])
return df return df
def mysql_to_df(self, query): def mysql_to_df(self, query, table_name):
"""provide an arbitrary sql query that should be read from a mysql server {sql_connection}. returns a pandas DataFrame with the obtained data""" """provide an arbitrary sql query that should be read from a mysql server {sql_connection}. returns a pandas DataFrame with the obtained data"""
df = pd.read_sql(query, self.sql_connection).convert_dtypes() with self.sql_connection.cursor(buffered=True) as cursor: # df = pd.read_sql(query, self.sql_connection).convert_dtypes()
df = df.set_index('id', inplace=False) # avoid inplace updates, so the raw sql remains unchanged # 1.) get the column names
cursor.execute(f"DESCRIBE {table_name}")
cols = cursor.fetchall()
column_names = [col_name[0] for col_name in cols]
# 2.) get the data tuples
cursor.execute(query)
data = cursor.fetchall()
# 3.) map the data tuples to the correct column names
data = [{k:v for k,v in zip(column_names, dat)} for dat in data]
# 4.) build a dataframe from the respective data models (which ensures the correct data type)
data_model = self.str_to_model_dict.get(table_name)
if data_model is not None:
df = pd.DataFrame([data_model(**dat) for dat in data])
else:
df = pd.DataFrame([dat for dat in data])
if 'id' in df.columns:
df = df.set_index('id', inplace=False) # avoid inplace updates, so the raw sql remains unchanged
return df return df
def read_all(self, all_schemas): def read_all(self, all_schemas):
@ -64,7 +102,7 @@ class SQLHandler():
mysql_df_dict = {} mysql_df_dict = {}
for schem in all_schemas: for schem in all_schemas:
query = f"SELECT * FROM {schem}" query = f"SELECT * FROM {schem}"
mysql_df_dict[schem] = self.mysql_to_df(query) mysql_df_dict[schem] = self.mysql_to_df(query, table_name=schem)
return mysql_df_dict return mysql_df_dict
def initialize_shipcall_participant_list(self): def initialize_shipcall_participant_list(self):