Files
postgresmanager/postgresmanager.py
tanuki tanuki cc2cbdc025 init
2025-11-10 10:35:26 -05:00

250 lines
10 KiB
Python

import psycopg
import logging
import json
from psycopg import sql
from psycopg.rows import dict_row, DictRow, TupleRow
from typing import Any, Dict, Optional
log = logging.getLogger(__name__)
class PostgresManager:
"""Python wrapper for the psycopg API to streamline interactions with MAGNET Databases. When the object
gets deleted, it will explicitly close the instance's connection to the underlying database
Returns:
object: PostgresManager object with an ACTIVE Connection to the database.
"""
def __init__(self, host:str = "",
port:int = 5432,
database:str = "",
user:str = "",
password:str = ""):
log.debug('Creating PSQL connection...')
self.__conn = psycopg.connect(f"host={host} port={port} dbname={database} user={user} password={password}",
row_factory=dict_row,
autocommit=True)
def __del__(self):
log.info("Closing PSQL connection...")
del self.__conn
def insert(self, table:str, data:Dict[str,Any])->bool:
""" Insert into a given table the dictionary of Column/Value pairs
Args:
table (str): Table name
data (Dict[str,Any]): Column/Value dictionary (e.g { "Col1":"Value1" })
Returns:
bool: True if insert was successful, false if otherwise
Example:
To insert into the 'names' table, pass data={"name":"sally"}. Equivalent of
SQL Query 'INSERT INTO names(name) VALUES(sally);'
"""
log.info(f"inserting values into {table} table")
log.debug(f"data: {data}")
# JSON Serialize any Dicts because psql doesn't understand python dicts
for k,v in data.items():
if isinstance(v, dict):
# If it is a dict, if empty serialize otherwise do not or it will put in random garbage
if bool(v):
v = json.dumps(v)
data[k] = v # Override with json string
else:
data[k] = ""
columns = sql.SQL(", ").join(map(sql.Identifier, data.keys()))
values = [value for value in data.values()]
log.debug(f"values: {values}")
query = sql.SQL("INSERT INTO {table} ({cols}) VALUES ({placeholders})").format(
table=sql.Identifier(table),
cols=columns,
placeholders=sql.SQL(", ").join(sql.Placeholder() * len(data.values())))
log.debug(f'SQL Query: {query.as_string(self.__conn)}')
with self.__conn.cursor() as cur:
try:
cur.execute(query,values)
except psycopg.DatabaseError as de:
log.error(f"Database error: {de}")
return False
except Exception as e:
log.critical(f"Critical error: {e}")
return False
#self.conn.commit()
logging.info("Insert successful")
logging.debug("Exiting DB insert...")
return True
def delete(self, table:str, where:dict[str,Any])->bool:
""" DELETES from a given table.
Args:
table (str): Table to delete from
where (dict[str,Any]): where condition that represents LEFT_HAND = RIGHT_HAND.
example: if you want to say 'DELETE FROM orchid WHERE fruit = apple', you could pass where={"fruit":"apple"}
Returns:
bool: true if the transaction was successful, false if otherwise
"""
log.info(f"Deleting from {table} table")
log.debug(f"where_condition: {where}")
col = list(where.keys())[0]
query = sql.SQL("DELETE FROM {table} WHERE {col_cond} = %s").format(
table=sql.Identifier(table),
col_cond=sql.Identifier(table,list(where.keys())[0])
)
log.debug(f'SQL Query: {query.as_string(self.__conn)}')
with self.__conn.cursor() as cur:
try:
cur.execute(query,[where.get(col)])
except psycopg.DatabaseError as de:
log.error(f"Database error: {de}")
return False
except Exception as e:
log.critical(f"Critical error: {e}")
return False
#self.conn.commit()
return True
def select(self, table:str, columns:list, where:Dict[str,Any] | None = None)->DictRow | None:
""" A select query that retrieves values from passed columns and accept a where condition
Args:
table (str): Table to select values from
columns (list): The column names of the values you want from
where (Dict[str,Any] | None, optional): A dictionary representing LEFT_HAND = RIGHT_HAND where condition
Returns:
Union[Dict[str,Any]|None]: A [Dict[str,Any]] if transaction was successful, None if otherwise.
"""
logging.debug(f"Table: {table}")
logging.debug(f'Where Condition: {where}')
with self.__conn.cursor() as cur:
try:
r = []
if where:
logging.debug("Triggered SELECT with WHERE")
where_key = list(where.keys())[0]
where_value = where.get(where_key)
logging.debug(f'Where_cond key = {where_key}, value = {where.get(where_key)}')
query = sql.SQL("SELECT {cols} FROM {table_name} WHERE {where_cond} = %s").format(
cols=sql.SQL(", ").join(map(sql.Identifier, columns)),
table_name = sql.Identifier('public',table),
where_cond = sql.Identifier(where_key)
)
log.debug(f'SQL Query: {query.as_string(self.__conn)}')
cur.execute(query,(where_value,))
r = cur.fetchall()
log.debug(f'r value = {r}')
else:
logging.debug("Triggered ONLY SELECT")
query = sql.SQL("SELECT {cols} FROM {table_name}").format(
cols=sql.SQL(", ").join(map(sql.Identifier, columns)),
table_name = sql.Identifier(table),
)
logging.debug(f"SQL QUERY: {query.as_string(self.__conn)}")
cur.execute(query)
r = cur.fetchall()
log.debug(f'r value = {r}')
except psycopg.DatabaseError as de:
log.error(f"Database error: {de}")
return None
except Exception as e:
log.critical(f"Critical error: {e}")
return None
log.debug(f"SELECT returned: {r}")
return r
def update(self, table, set: dict[str, Any]={}, where: dict[str, Any]={}, pkey_name: str='id')->int | str:
""" Updates a record based on the set and where conditions passed.
Args:
table (_type_): Table to apply the update in
set (dict[str, Any], optional): Column/Value pair, of what update values to SET. Defaults to {}.
where (dict[str, Any], optional): where condition to do the UPDATE. Defaults to {}.
Returns:
int | str: Returns the ID of the updated record
"""
log.info(f"Updating record in {table} table")
log.debug(f"set: {set}")
log.debug(f"where_condition: {where}")
# JSON Serialize any Dicts because psql doesn't understand python dicts
for k,v in set.items():
if isinstance(v, dict):
# If not an empty dict then serialize otherwise it will put in random garbage
if bool(v):
v = json.dumps(v)
set[k] = v # Override with json string
else:
set[k] = ""
col = list(where.keys())[0]
set_clause_strings = []
log.debug(f'set values = {set.values()}')
where_clause_strings = []
set_where_values = []
# Build the SET key = %s string
for key,value in set.items():
set_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}")
set_where_values.append(value)
logging.debug(f'set clause = {set_clause_strings}')
if len(where.keys()) == 1:
for key,value in where.items():
where_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}")
set_where_values.append(value)
else:
for key,value in where.items():
where_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}")
set_where_values.append(value)
logging.debug(f'where clause = {where_clause_strings}')
logging.debug(f'set_where_values = {set_where_values}')
query = sql.SQL("UPDATE {table} SET {lhs} WHERE {col_conds} RETURNING {pkey}").format(
table=sql.Identifier("public",table),
lhs=sql.SQL(', '.join(set for set in set_clause_strings)),
col_conds=sql.SQL(' AND '.join(where for where in where_clause_strings)),
pkey=sql.Identifier(table,pkey_name),
)
logging.debug(f"SQL QUERY: {query.as_string(self.__conn)}")
with self.__conn.cursor() as cur:
r: dict = {}
try:
cur.execute(query,set_where_values)
r = cur.fetchone()
logging.debug(f'the value of r is: {r}')
if r is None:
raise psycopg.DatabaseError('Update affected no rows')
except psycopg.DatabaseError as de:
log.error(f"Database error: {de}")
return -1
except Exception as e:
log.critical(f"Critical error: {e}")
return -1
return r[pkey_name]