Spaces:
Sleeping
Sleeping
File size: 5,046 Bytes
91a4436 a394efd 91a4436 aff05a7 a394efd 91a4436 a394efd 91a4436 a394efd 91a4436 a394efd 91a4436 aff05a7 91a4436 a394efd 91a4436 a394efd 205d5cc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import csv
import re
import pandas as pd
import sqlite3
import gradio as gr
import os
from qatch.connectors.sqlite_connector import SqliteConnector
def extract_tables(file_path):
conn = sqlite3.connect(file_path)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tabelle = cursor.fetchall()
tabelle = [tabella for tabella in tabelle if tabella[0] != 'sqlite_sequence']
return tabelle
def extract_dataframes(file_path):
conn = sqlite3.connect(file_path)
tabelle = extract_tables(file_path)
dfs = {}
for tabella in tabelle:
nome_tabella = tabella[0]
df = pd.read_sql_query(f"SELECT * FROM {nome_tabella}", conn)
dfs[nome_tabella] = df
conn.close()
return dfs
def carica_sqlite(file_path, db_id):
data_output = {'data_frames': extract_dataframes(file_path),'db':SqliteConnector(relative_db_path=file_path, db_name=db_id)}
return data_output
# Funzione per leggere un file CSV
def load_csv(file):
df = pd.read_csv(file)
return df
# Funzione per leggere un file Excel
def carica_excel(file):
xls = pd.ExcelFile(file)
dfs = {}
for sheet_name in xls.sheet_names:
dfs[sheet_name] = xls.parse(sheet_name)
return dfs
def load_data(data_path : str, db_name : str):
data_output = {'data_frames': {} ,'db': None}
table_name = os.path.splitext(os.path.basename(data_path))[0]
if data_path.endswith(".sqlite") :
data_output = carica_sqlite(data_path, db_name)
elif data_path.endswith(".csv"):
data_output['data_frames'] = {f"{table_name}_table" : load_csv(data_path)}
elif data_path.endswith(".xlsx"):
data_output['data_frames'] = carica_excel(data_path)
else:
raise gr.Error("Formato file non supportato. Carica un file SQLite, CSV o Excel.")
return data_output
def read_api(api_key_path):
with open(api_key_path, "r", encoding="utf-8") as file:
api_key = file.read()
return api_key
def read_models_csv(file_path):
# Reads a CSV file and returns a list of dictionaries
models = [] # Change {} to []
with open(file_path, mode="r", newline="") as file:
reader = csv.DictReader(file)
for row in reader:
row["price"] = float(row["price"]) # Convert price to float
models.append(row) # Append to the list
return models
def csv_to_dict(file_path):
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
data = []
for row in reader:
if "price" in row:
row["price"] = float(row["price"])
data.append(row)
return data
def increment_filename(filename):
base, ext = os.path.splitext(filename)
numbers = re.findall(r'\d+', base)
if numbers:
max_num = max(map(int, numbers)) + 1
new_base = re.sub(r'(\d+)', lambda m: str(max_num) if int(m.group(1)) == max(map(int, numbers)) else m.group(1), base)
else:
new_base = base + '1'
return new_base + ext
def prepare_prompt(prompt, question, schema, samples):
prompt = prompt.replace("{schema}", schema).replace("{question}", question)
prompt += f" Some istanze: {samples}"
return prompt
def generate_some_samples(connector, tbl_name):
samples = []
query = f"SELECT * FROM {tbl_name} LIMIT 3"
try:
sample_data = connector.execute_query(query)
samples.append(str(sample_data))
except Exception as e:
samples.append(f"Error: {e}")
return samples
def extract_tables_dict(pnp_path):
tables_dict = {}
# df = pd.read_csv(pnp_path)
# with open(pnp_path, mode='r', encoding='utf-8') as file:
# reader = csv.DictReader(file)
# for row in reader:
# tbl_name = row.get("tbl_name")
# db_path = row.get("db_path")
# if tbl_name and db_path:
# print(db_path, tbl_name)
# connector = SqliteConnector(relative_db_path=db_path, db_name=os.path.basename(db_path))
# instances = generate_some_samples(connector, tbl_name)
# if tbl_name not in tables_dict:
# tables_dict[tbl_name] = []
# tables_dict[tbl_name].extend(instances)
with open(pnp_path, mode='r', encoding='utf-8') as file:
reader = csv.DictReader(file)
for row in reader:
tbl_name = row.get("tbl_name")
if tbl_name not in tables_dict:
tables_dict[tbl_name] = []
#tables_dict[tbl_name].append(row)
return tables_dict
def check_and_create_dir(db_path):
# Check if the folder exists, and create it if it doesn't
if not os.path.exists(db_path):
os.makedirs(db_path)
print(f"Folder created: {db_path}")
else:
print(f"Folder already exists: {db_path}") |