import logging | |
import time | |
from util import config | |
import pandas as pd | |
class Postgres(object): | |
def __init__(self, env, dbnm): | |
self.config = config.init(env) | |
self.conn = None | |
self.dbnm = dbnm | |
self.__open__() | |
def __open__(self): | |
import psycopg2 | |
try: | |
if self.conn:self.__close__() | |
del self.conn | |
except Exception as e: | |
pass | |
try: | |
self.conn = psycopg2.connect(f"dbname={self.dbnm} user={self.config.get('pgdb_usr')} password={self.config.get('pgdb_pwd')} host={self.config.get('pgdb_host')} port={self.config.get('pgdb_port')}") | |
except Exception as e: | |
logging.error("Fail to connect %s "%self.config.get("pgdb_host") + str(e)) | |
def __close__(self): | |
try: | |
self.conn.close() | |
except Exception as e: | |
logging.error("Fail to close %s "%self.config.get("pgdb_host") + str(e)) | |
def select(self, sql): | |
for _ in range(10): | |
try: | |
return pd.read_sql(sql, self.conn) | |
except Exception as e: | |
logging.error(f"Fail to exec {sql} "+str(e)) | |
self.__open__() | |
time.sleep(1) | |
return pd.DataFrame() | |
def update(self, sql): | |
for _ in range(10): | |
try: | |
cur = self.conn.cursor() | |
cur.execute(sql) | |
updated_rows = cur.rowcount | |
conn.commit() | |
cur.close() | |
return updated_rows | |
except Exception as e: | |
logging.error(f"Fail to exec {sql} "+str(e)) | |
self.__open__() | |
time.sleep(1) | |
return 0 | |
if __name__ == "__main__": | |
Postgres("infiniflow", "docgpt") | |