import psycopg import logging import json from psycopg import sql from psycopg.rows import dict_row, DictRow, TupleRow from typing import Any, Dict, Optional log = logging.getLogger(__name__) class PostgresManager: """Python wrapper for the psycopg API to streamline interactions with MAGNET Databases. When the object gets deleted, it will explicitly close the instance's connection to the underlying database Returns: object: PostgresManager object with an ACTIVE Connection to the database. """ def __init__(self, host:str = "", port:int = 5432, database:str = "", user:str = "", password:str = ""): log.debug('Creating PSQL connection...') self.__conn = psycopg.connect(f"host={host} port={port} dbname={database} user={user} password={password}", row_factory=dict_row, autocommit=True) def __del__(self): log.info("Closing PSQL connection...") del self.__conn def insert(self, table:str, data:Dict[str,Any])->bool: """ Insert into a given table the dictionary of Column/Value pairs Args: table (str): Table name data (Dict[str,Any]): Column/Value dictionary (e.g { "Col1":"Value1" }) Returns: bool: True if insert was successful, false if otherwise Example: To insert into the 'names' table, pass data={"name":"sally"}. Equivalent of SQL Query 'INSERT INTO names(name) VALUES(sally);' """ log.info(f"inserting values into {table} table") log.debug(f"data: {data}") # JSON Serialize any Dicts because psql doesn't understand python dicts for k,v in data.items(): if isinstance(v, dict): # If it is a dict, if empty serialize otherwise do not or it will put in random garbage if bool(v): v = json.dumps(v) data[k] = v # Override with json string else: data[k] = "" columns = sql.SQL(", ").join(map(sql.Identifier, data.keys())) values = [value for value in data.values()] log.debug(f"values: {values}") query = sql.SQL("INSERT INTO {table} ({cols}) VALUES ({placeholders})").format( table=sql.Identifier(table), cols=columns, placeholders=sql.SQL(", ").join(sql.Placeholder() * len(data.values()))) log.debug(f'SQL Query: {query.as_string(self.__conn)}') with self.__conn.cursor() as cur: try: cur.execute(query,values) except psycopg.DatabaseError as de: log.error(f"Database error: {de}") return False except Exception as e: log.critical(f"Critical error: {e}") return False #self.conn.commit() logging.info("Insert successful") logging.debug("Exiting DB insert...") return True def delete(self, table:str, where:dict[str,Any])->bool: """ DELETES from a given table. Args: table (str): Table to delete from where (dict[str,Any]): where condition that represents LEFT_HAND = RIGHT_HAND. example: if you want to say 'DELETE FROM orchid WHERE fruit = apple', you could pass where={"fruit":"apple"} Returns: bool: true if the transaction was successful, false if otherwise """ log.info(f"Deleting from {table} table") log.debug(f"where_condition: {where}") col = list(where.keys())[0] query = sql.SQL("DELETE FROM {table} WHERE {col_cond} = %s").format( table=sql.Identifier(table), col_cond=sql.Identifier(table,list(where.keys())[0]) ) log.debug(f'SQL Query: {query.as_string(self.__conn)}') with self.__conn.cursor() as cur: try: cur.execute(query,[where.get(col)]) except psycopg.DatabaseError as de: log.error(f"Database error: {de}") return False except Exception as e: log.critical(f"Critical error: {e}") return False #self.conn.commit() return True def select(self, table:str, columns:list, where:Dict[str,Any] | None = None)->DictRow | None: """ A select query that retrieves values from passed columns and accept a where condition Args: table (str): Table to select values from columns (list): The column names of the values you want from where (Dict[str,Any] | None, optional): A dictionary representing LEFT_HAND = RIGHT_HAND where condition Returns: Union[Dict[str,Any]|None]: A [Dict[str,Any]] if transaction was successful, None if otherwise. """ logging.debug(f"Table: {table}") logging.debug(f'Where Condition: {where}') with self.__conn.cursor() as cur: try: r = [] if where: logging.debug("Triggered SELECT with WHERE") where_key = list(where.keys())[0] where_value = where.get(where_key) logging.debug(f'Where_cond key = {where_key}, value = {where.get(where_key)}') query = sql.SQL("SELECT {cols} FROM {table_name} WHERE {where_cond} = %s").format( cols=sql.SQL(", ").join(map(sql.Identifier, columns)), table_name = sql.Identifier('public',table), where_cond = sql.Identifier(where_key) ) log.debug(f'SQL Query: {query.as_string(self.__conn)}') cur.execute(query,(where_value,)) r = cur.fetchall() log.debug(f'r value = {r}') else: logging.debug("Triggered ONLY SELECT") query = sql.SQL("SELECT {cols} FROM {table_name}").format( cols=sql.SQL(", ").join(map(sql.Identifier, columns)), table_name = sql.Identifier(table), ) logging.debug(f"SQL QUERY: {query.as_string(self.__conn)}") cur.execute(query) r = cur.fetchall() log.debug(f'r value = {r}') except psycopg.DatabaseError as de: log.error(f"Database error: {de}") return None except Exception as e: log.critical(f"Critical error: {e}") return None log.debug(f"SELECT returned: {r}") return r def update(self, table, set: dict[str, Any]={}, where: dict[str, Any]={}, pkey_name: str='id')->int | str: """ Updates a record based on the set and where conditions passed. Args: table (_type_): Table to apply the update in set (dict[str, Any], optional): Column/Value pair, of what update values to SET. Defaults to {}. where (dict[str, Any], optional): where condition to do the UPDATE. Defaults to {}. Returns: int | str: Returns the ID of the updated record """ log.info(f"Updating record in {table} table") log.debug(f"set: {set}") log.debug(f"where_condition: {where}") # JSON Serialize any Dicts because psql doesn't understand python dicts for k,v in set.items(): if isinstance(v, dict): # If not an empty dict then serialize otherwise it will put in random garbage if bool(v): v = json.dumps(v) set[k] = v # Override with json string else: set[k] = "" col = list(where.keys())[0] set_clause_strings = [] log.debug(f'set values = {set.values()}') where_clause_strings = [] set_where_values = [] # Build the SET key = %s string for key,value in set.items(): set_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}") set_where_values.append(value) logging.debug(f'set clause = {set_clause_strings}') if len(where.keys()) == 1: for key,value in where.items(): where_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}") set_where_values.append(value) else: for key,value in where.items(): where_clause_strings.append(f"{sql.Identifier(key).as_string()} = {sql.Placeholder().as_string()}") set_where_values.append(value) logging.debug(f'where clause = {where_clause_strings}') logging.debug(f'set_where_values = {set_where_values}') query = sql.SQL("UPDATE {table} SET {lhs} WHERE {col_conds} RETURNING {pkey}").format( table=sql.Identifier("public",table), lhs=sql.SQL(', '.join(set for set in set_clause_strings)), col_conds=sql.SQL(' AND '.join(where for where in where_clause_strings)), pkey=sql.Identifier(table,pkey_name), ) logging.debug(f"SQL QUERY: {query.as_string(self.__conn)}") with self.__conn.cursor() as cur: r: dict = {} try: cur.execute(query,set_where_values) r = cur.fetchone() logging.debug(f'the value of r is: {r}') if r is None: raise psycopg.DatabaseError('Update affected no rows') except psycopg.DatabaseError as de: log.error(f"Database error: {de}") return -1 except Exception as e: log.critical(f"Critical error: {e}") return -1 return r[pkey_name]