|
import logging |
|
import time |
|
from util import config |
|
import pandas as pd |
|
|
|
class Postgres(object): |
|
def __init__(self, env, dbnm): |
|
self.config = config.init(env) |
|
self.conn = None |
|
self.dbnm = dbnm |
|
self.__open__() |
|
|
|
def __open__(self): |
|
import psycopg2 |
|
try: |
|
if self.conn:self.__close__() |
|
del self.conn |
|
except Exception as e: |
|
pass |
|
|
|
try: |
|
self.conn = psycopg2.connect(f"dbname={self.dbnm} user={self.config.get('pgdb_usr')} password={self.config.get('pgdb_pwd')} host={self.config.get('pgdb_host')} port={self.config.get('pgdb_port')}") |
|
except Exception as e: |
|
logging.error("Fail to connect %s "%self.config.get("pgdb_host") + str(e)) |
|
|
|
|
|
def __close__(self): |
|
try: |
|
self.conn.close() |
|
except Exception as e: |
|
logging.error("Fail to close %s "%self.config.get("pgdb_host") + str(e)) |
|
|
|
|
|
def select(self, sql): |
|
for _ in range(10): |
|
try: |
|
return pd.read_sql(sql, self.conn) |
|
except Exception as e: |
|
logging.error(f"Fail to exec {sql} "+str(e)) |
|
self.__open__() |
|
time.sleep(1) |
|
|
|
return pd.DataFrame() |
|
|
|
|
|
def update(self, sql): |
|
for _ in range(10): |
|
try: |
|
cur = self.conn.cursor() |
|
cur.execute(sql) |
|
updated_rows = cur.rowcount |
|
self.conn.commit() |
|
cur.close() |
|
return updated_rows |
|
except Exception as e: |
|
logging.error(f"Fail to exec {sql} "+str(e)) |
|
self.__open__() |
|
time.sleep(1) |
|
return 0 |
|
|
|
if __name__ == "__main__": |
|
Postgres("infiniflow", "docgpt") |
|
|
|
|