250 lines
10 KiB
Python
250 lines
10 KiB
Python
import psycopg
|
|
import logging
|
|
import json
|
|
from psycopg import sql
|
|
from psycopg.rows import dict_row, DictRow, TupleRow
|
|
from typing import Any, Dict, Optional
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
class PostgresManager:
|
|
"""Python wrapper for the psycopg API to streamline interactions with MAGNET Databases. When the object
|
|
gets deleted, it will explicitly close the instance's connection to the underlying database
|
|
|
|
Returns:
|
|
object: PostgresManager object with an ACTIVE Connection to the database.
|
|
"""
|
|
def __init__(self, host:str = "",
|
|
port:int = 5432,
|
|
database:str = "",
|
|
user:str = "",
|
|
password:str = ""):
|
|
log.debug('Creating PSQL connection...')
|
|
self.__conn = psycopg.connect(f"host={host} port={port} dbname={database} user={user} password={password}",
|
|
row_factory=dict_row,
|
|
autocommit=True)
|
|
|
|
def __del__(self):
|
|
log.info("Closing PSQL connection...")
|
|
del self.__conn
|
|
|
|
def insert(self, table:str, data:Dict[str,Any])->bool:
|
|
""" Insert into a given table the dictionary of Column/Value pairs
|
|
|
|
Args:
|
|
table (str): Table name
|
|
data (Dict[str,Any]): Column/Value dictionary (e.g { "Col1":"Value1" })
|
|
|
|
Returns:
|
|
bool: True if insert was successful, false if otherwise
|
|
Example:
|
|
To insert into the 'names' table, pass data={"name":"sally"}. Equivalent of
|
|
SQL Query 'INSERT INTO names(name) VALUES(sally);'
|
|
"""
|
|
log.info(f"inserting values into {table} table")
|
|
log.debug(f"data: {data}")
|
|
|
|
# JSON Serialize any Dicts because psql doesn't understand python dicts
|
|
for k,v in data.items():
|
|
if isinstance(v, dict):
|
|
|
|
# If it is a dict, if empty serialize otherwise do not or it will put in random garbage
|
|
if bool(v):
|
|
v = json.dumps(v)
|
|
data[k] = v # Override with json string
|
|
else:
|
|
data[k] = ""
|
|
|
|
|
|
columns = sql.SQL(", ").join(map(sql.Identifier, data.keys()))
|
|
values = [value for value in data.values()]
|
|
log.debug(f"values: {values}")
|
|
query = sql.SQL("INSERT INTO {table} ({cols}) VALUES ({placeholders})").format(
|
|
table=sql.Identifier(table),
|
|
cols=columns,
|
|
placeholders=sql.SQL(", ").join(sql.Placeholder() * len(data.values())))
|
|
|
|
log.debug(f'SQL Query: {query.as_string(self.__conn)}')
|
|
with self.__conn.cursor() as cur:
|
|
try:
|
|
cur.execute(query,values)
|
|
|
|
except psycopg.DatabaseError as de:
|
|
log.error(f"Database error: {de}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
log.critical(f"Critical error: {e}")
|
|
return False
|
|
|
|
|
|
#self.conn.commit()
|
|
logging.info("Insert successful")
|
|
logging.debug("Exiting DB insert...")
|
|
return True
|
|
|
|
def delete(self, table:str, where:dict[str,Any])->bool:
|
|
""" DELETES from a given table.
|
|
|
|
Args:
|
|
table (str): Table to delete from
|
|
where (dict[str,Any]): where condition that represents LEFT_HAND = RIGHT_HAND.
|
|
example: if you want to say 'DELETE FROM orchid WHERE fruit = apple', you could pass where={"fruit":"apple"}
|
|
|
|
Returns:
|
|
bool: true if the transaction was successful, false if otherwise
|
|
"""
|
|
log.info(f"Deleting from {table} table")
|
|
log.debug(f"where_condition: {where}")
|
|
col = list(where.keys())[0]
|
|
query = sql.SQL("DELETE FROM {table} WHERE {col_cond} = %s").format(
|
|
table=sql.Identifier(table),
|
|
col_cond=sql.Identifier(table,list(where.keys())[0])
|
|
)
|
|
|
|
log.debug(f'SQL Query: {query.as_string(self.__conn)}')
|
|
with self.__conn.cursor() as cur:
|
|
try:
|
|
cur.execute(query,[where.get(col)])
|
|
|
|
except psycopg.DatabaseError as de:
|
|
log.error(f"Database error: {de}")
|
|
return False
|
|
|
|
except Exception as e:
|
|
log.critical(f"Critical error: {e}")
|
|
return False
|
|
|
|
|
|
#self.conn.commit()
|
|
return True
|
|
|
|
def select(self, table:str, columns:list, where:Dict[str,Any] | None = None)->DictRow | None:
|
|
""" A select query that retrieves values from passed columns and accept a where condition
|
|
|
|
Args:
|
|
table (str): Table to select values from
|
|
columns (list): The column names of the values you want from
|
|
where (Dict[str,Any] | None, optional): A dictionary representing LEFT_HAND = RIGHT_HAND where condition
|
|
|
|
Returns:
|
|
Union[Dict[str,Any]|None]: A [Dict[str,Any]] if transaction was successful, None if otherwise.
|
|
"""
|
|
logging.debug(f"Table: {table}")
|
|
logging.debug(f'Where Condition: {where}')
|
|
|
|
|
|
with self.__conn.cursor() as cur:
|
|
try:
|
|
r = []
|
|
if where:
|
|
logging.debug("Triggered SELECT with WHERE")
|
|
where_key = list(where.keys())[0]
|
|
where_value = where.get(where_key)
|
|
logging.debug(f'Where_cond key = {where_key}, value = {where.get(where_key)}')
|
|
query = sql.SQL("SELECT {cols} FROM {table_name} WHERE {where_cond} = %s").format(
|
|
cols=sql.SQL(", ").join(map(sql.Identifier, columns)),
|
|
table_name = sql.Identifier('public',table),
|
|
where_cond = sql.Identifier(where_key)
|
|
)
|
|
log.debug(f'SQL Query: {query.as_string(self.__conn)}')
|
|
|
|
cur.execute(query,(where_value,))
|
|
r = cur.fetchall()
|
|
log.debug(f'r value = {r}')
|
|
else:
|
|
logging.debug("Triggered ONLY SELECT")
|
|
query = sql.SQL("SELECT {cols} FROM {table_name}").format(
|
|
cols=sql.SQL(", ").join(map(sql.Identifier, columns)),
|
|
table_name = sql.Identifier(table),
|
|
)
|
|
logging.debug(f"SQL QUERY: {query.as_string(self.__conn)}")
|
|
cur.execute(query)
|
|
r = cur.fetchall()
|
|
log.debug(f'r value = {r}')
|
|
except psycopg.DatabaseError as de:
|
|
log.error(f"Database error: {de}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
log.critical(f"Critical error: {e}")
|
|
return None
|
|
|
|
log.debug(f"SELECT returned: {r}")
|
|
return r
|
|
|
|
def update(self, table, set: dict[str, Any]={}, where: dict[str, Any]={}, pkey_name: str='id')->int | str:
|
|
""" Updates a record based on the set and where conditions passed.
|
|
|
|
Args:
|
|
table (_type_): Table to apply the update in
|
|
set (dict[str, Any], optional): Column/Value pair, of what update values to SET. Defaults to {}.
|
|
where (dict[str, Any], optional): where condition to do the UPDATE. Defaults to {}.
|
|
|
|
Returns:
|
|
int | str: Returns the ID of the updated record
|
|
"""
|
|
log.info(f"Updating record in {table} table")
|
|
log.debug(f"set: {set}")
|
|
log.debug(f"where_condition: {where}")
|
|
|
|
# JSON Serialize any Dicts because psql doesn't understand python dicts
|
|
for k,v in set.items():
|
|
if isinstance(v, dict):
|
|
# If not an empty dict then serialize otherwise it will put in random garbage
|
|
if bool(v):
|
|
v = json.dumps(v)
|
|
set[k] = v # Override with json string
|
|
else:
|
|
set[k] = ""
|
|
|
|
col = list(where.keys())[0]
|
|
set_clause_strings = []
|
|
log.debug(f'set values = {set.values()}')
|
|
where_clause_strings = []
|
|
set_where_values = []
|
|
|
|
# Build the SET key = %s string
|
|
for key,value in set.items():
|
|
set_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}")
|
|
set_where_values.append(value)
|
|
logging.debug(f'set clause = {set_clause_strings}')
|
|
|
|
if len(where.keys()) == 1:
|
|
for key,value in where.items():
|
|
where_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}")
|
|
set_where_values.append(value)
|
|
else:
|
|
for key,value in where.items():
|
|
where_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}")
|
|
set_where_values.append(value)
|
|
logging.debug(f'where clause = {where_clause_strings}')
|
|
|
|
logging.debug(f'set_where_values = {set_where_values}')
|
|
|
|
query = sql.SQL("UPDATE {table} SET {lhs} WHERE {col_conds} RETURNING {pkey}").format(
|
|
table=sql.Identifier("public",table),
|
|
lhs=sql.SQL(', '.join(set for set in set_clause_strings)),
|
|
col_conds=sql.SQL(' AND '.join(where for where in where_clause_strings)),
|
|
pkey=sql.Identifier(table,pkey_name),
|
|
)
|
|
|
|
logging.debug(f"SQL QUERY: {query.as_string(self.__conn)}")
|
|
with self.__conn.cursor() as cur:
|
|
r: dict = {}
|
|
try:
|
|
cur.execute(query,set_where_values)
|
|
r = cur.fetchone()
|
|
logging.debug(f'the value of r is: {r}')
|
|
if r is None:
|
|
raise psycopg.DatabaseError('Update affected no rows')
|
|
|
|
except psycopg.DatabaseError as de:
|
|
log.error(f"Database error: {de}")
|
|
return -1
|
|
|
|
except Exception as e:
|
|
log.critical(f"Critical error: {e}")
|
|
return -1
|
|
return r[pkey_name]
|