qatch-demo / utilities.py
simone-papicchio's picture
fix csv_reading
d44b620
raw
history blame
4.93 kB
import csv
import re
import pandas as pd
import sqlite3
import gradio as gr
import os
from qatch.connectors.sqlite_connector import SqliteConnector
def extract_tables(file_path):
conn = sqlite3.connect(file_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tabelle = cursor.fetchall()
tabelle = [tabella for tabella in tabelle if tabella[0] != 'sqlite_sequence']
return tabelle
def extract_dataframes(file_path):
conn = sqlite3.connect(file_path)
tabelle = extract_tables(file_path)
dfs = {}
for tabella in tabelle:
nome_tabella = tabella[0]
df = pd.read_sql_query(f"SELECT * FROM {nome_tabella}", conn)
dfs[nome_tabella] = df
conn.close()
return dfs
def carica_sqlite(file_path, db_id):
data_output = {'data_frames': extract_dataframes(file_path),'db':SqliteConnector(relative_db_path=file_path, db_name=db_id)}
return data_output
# Funzione per leggere un file CSV
def load_csv(file):
df = pd.read_csv(file)
return df
# Funzione per leggere un file Excel
def carica_excel(file):
xls = pd.ExcelFile(file)
dfs = {}
for sheet_name in xls.sheet_names:
dfs[sheet_name] = xls.parse(sheet_name)
return dfs
def load_data(data_path : str, db_name : str):
data_output = {'data_frames': {} ,'db': None}
table_name = os.path.splitext(os.path.basename(data_path))[0]
if data_path.endswith(".sqlite") :
data_output = carica_sqlite(data_path, db_name)
elif data_path.endswith(".csv"):
data_output['data_frames'] = {f"{table_name}_table" : load_csv(data_path)}
elif data_path.endswith(".xlsx"):
data_output['data_frames'] = carica_excel(data_path)
else:
raise gr.Error("Formato file non supportato. Carica un file SQLite, CSV o Excel.")
return data_output
def read_api(api_key_path):
with open(api_key_path, "r", encoding="utf-8") as file:
api_key = file.read()
return api_key
def read_models_csv(file_path):
# Reads a CSV file and returns a list of dictionaries
models = [] # Change {} to []
df = pd.read_csv(file_path)
for _, row in df.iterrows():
model_dict = row.to_dict()
models.append(model_dict)
return models
def csv_to_dict(file_path):
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
data = []
for row in reader:
if "price" in row:
row["price"] = float(row["price"])
data.append(row)
return data
def increment_filename(filename):
base, ext = os.path.splitext(filename)
numbers = re.findall(r'\d+', base)
if numbers:
max_num = max(map(int, numbers)) + 1
new_base = re.sub(r'(\d+)', lambda m: str(max_num) if int(m.group(1)) == max(map(int, numbers)) else m.group(1), base)
else:
new_base = base + '1'
return new_base + ext
def prepare_prompt(prompt, question, schema, samples):
prompt = prompt.replace("{schema}", schema).replace("{question}", question)
prompt += f" Some istanze: {samples}"
return prompt
def generate_some_samples(connector, tbl_name):
samples = []
query = f"SELECT * FROM {tbl_name} LIMIT 3"
try:
sample_data = connector.execute_query(query)
samples.append(str(sample_data))
except Exception as e:
samples.append(f"Error: {e}")
return samples
def extract_tables_dict(pnp_path):
tables_dict = {}
# df = pd.read_csv(pnp_path)
# with open(pnp_path, mode='r', encoding='utf-8') as file:
# reader = csv.DictReader(file)
# for row in reader:
# tbl_name = row.get("tbl_name")
# db_path = row.get("db_path")
# if tbl_name and db_path:
# print(db_path, tbl_name)
# connector = SqliteConnector(relative_db_path=db_path, db_name=os.path.basename(db_path))
# instances = generate_some_samples(connector, tbl_name)
# if tbl_name not in tables_dict:
# tables_dict[tbl_name] = []
# tables_dict[tbl_name].extend(instances)
with open(pnp_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
tbl_name = row.get("tbl_name")
if tbl_name not in tables_dict:
tables_dict[tbl_name] = []
#tables_dict[tbl_name].append(row)
return tables_dict
def check_and_create_dir(db_path):
# Check if the folder exists, and create it if it doesn't
if not os.path.exists(db_path):
os.makedirs(db_path)
print(f"Folder created: {db_path}")
else:
print(f"Folder already exists: {db_path}")