Spaces:
Sleeping
Sleeping
import csv | |
import re | |
import pandas as pd | |
import sqlite3 | |
import gradio as gr | |
import os | |
from qatch.connectors.sqlite_connector import SqliteConnector | |
def extract_tables(file_path): | |
conn = sqlite3.connect(file_path) | |
cursor = conn.cursor() | |
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") | |
tabelle = cursor.fetchall() | |
tabelle = [tabella for tabella in tabelle if tabella[0] != 'sqlite_sequence'] | |
return tabelle | |
def extract_dataframes(file_path): | |
conn = sqlite3.connect(file_path) | |
tabelle = extract_tables(file_path) | |
dfs = {} | |
for tabella in tabelle: | |
nome_tabella = tabella[0] | |
df = pd.read_sql_query(f"SELECT * FROM {nome_tabella}", conn) | |
dfs[nome_tabella] = df | |
conn.close() | |
return dfs | |
def carica_sqlite(file_path, db_id): | |
data_output = {'data_frames': extract_dataframes(file_path),'db':SqliteConnector(relative_db_path=file_path, db_name=db_id)} | |
return data_output | |
# Funzione per leggere un file CSV | |
def load_csv(file): | |
df = pd.read_csv(file) | |
return df | |
# Funzione per leggere un file Excel | |
def carica_excel(file): | |
xls = pd.ExcelFile(file) | |
dfs = {} | |
for sheet_name in xls.sheet_names: | |
dfs[sheet_name] = xls.parse(sheet_name) | |
return dfs | |
def load_data(data_path : str, db_name : str): | |
data_output = {'data_frames': {} ,'db': None} | |
table_name = os.path.splitext(os.path.basename(data_path))[0] | |
if data_path.endswith(".sqlite") : | |
data_output = carica_sqlite(data_path, db_name) | |
elif data_path.endswith(".csv"): | |
data_output['data_frames'] = {f"{table_name}_table" : load_csv(data_path)} | |
elif data_path.endswith(".xlsx"): | |
data_output['data_frames'] = carica_excel(data_path) | |
else: | |
raise gr.Error("Formato file non supportato. Carica un file SQLite, CSV o Excel.") | |
return data_output | |
def read_api(api_key_path): | |
with open(api_key_path, "r", encoding="utf-8") as file: | |
api_key = file.read() | |
return api_key | |
def read_models_csv(file_path): | |
# Reads a CSV file and returns a list of dictionaries | |
models = [] # Change {} to [] | |
df = pd.read_csv(file_path) | |
for _, row in df.iterrows(): | |
model_dict = row.to_dict() | |
models.append(model_dict) | |
return models | |
def csv_to_dict(file_path): | |
with open(file_path, mode='r', encoding='utf-8') as file: | |
reader = csv.DictReader(file) | |
data = [] | |
for row in reader: | |
if "price" in row: | |
row["price"] = float(row["price"]) | |
data.append(row) | |
return data | |
def increment_filename(filename): | |
base, ext = os.path.splitext(filename) | |
numbers = re.findall(r'\d+', base) | |
if numbers: | |
max_num = max(map(int, numbers)) + 1 | |
new_base = re.sub(r'(\d+)', lambda m: str(max_num) if int(m.group(1)) == max(map(int, numbers)) else m.group(1), base) | |
else: | |
new_base = base + '1' | |
return new_base + ext | |
def prepare_prompt(prompt, question, schema, samples): | |
prompt = prompt.replace("{schema}", schema).replace("{question}", question) | |
prompt += f" Some istanze: {samples}" | |
return prompt | |
def generate_some_samples(connector, tbl_name): | |
samples = [] | |
query = f"SELECT * FROM {tbl_name} LIMIT 3" | |
try: | |
sample_data = connector.execute_query(query) | |
samples.append(str(sample_data)) | |
except Exception as e: | |
samples.append(f"Error: {e}") | |
return samples | |
def extract_tables_dict(pnp_path): | |
tables_dict = {} | |
# df = pd.read_csv(pnp_path) | |
# with open(pnp_path, mode='r', encoding='utf-8') as file: | |
# reader = csv.DictReader(file) | |
# for row in reader: | |
# tbl_name = row.get("tbl_name") | |
# db_path = row.get("db_path") | |
# if tbl_name and db_path: | |
# print(db_path, tbl_name) | |
# connector = SqliteConnector(relative_db_path=db_path, db_name=os.path.basename(db_path)) | |
# instances = generate_some_samples(connector, tbl_name) | |
# if tbl_name not in tables_dict: | |
# tables_dict[tbl_name] = [] | |
# tables_dict[tbl_name].extend(instances) | |
with open(pnp_path, mode='r', encoding='utf-8') as file: | |
reader = csv.DictReader(file) | |
for row in reader: | |
tbl_name = row.get("tbl_name") | |
if tbl_name not in tables_dict: | |
tables_dict[tbl_name] = [] | |
#tables_dict[tbl_name].append(row) | |
return tables_dict | |
def check_and_create_dir(db_path): | |
# Check if the folder exists, and create it if it doesn't | |
if not os.path.exists(db_path): | |
os.makedirs(db_path) | |
print(f"Folder created: {db_path}") | |
else: | |
print(f"Folder already exists: {db_path}") |