File size: 5,046 Bytes
91a4436
a394efd
91a4436
 
 
 
aff05a7
a394efd
91a4436
 
 
 
 
a394efd
 
 
 
 
91a4436
 
 
 
 
 
a394efd
 
 
 
91a4436
 
 
a394efd
91a4436
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aff05a7
91a4436
a394efd
91a4436
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a394efd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
205d5cc
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import csv
import re
import pandas as pd
import sqlite3
import gradio as gr
import os
from qatch.connectors.sqlite_connector import SqliteConnector
def extract_tables(file_path):
    conn = sqlite3.connect(file_path)
    cursor = conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tabelle = cursor.fetchall()
    tabelle = [tabella for tabella in tabelle if tabella[0] != 'sqlite_sequence']
    return tabelle

def extract_dataframes(file_path):
    conn = sqlite3.connect(file_path)
    tabelle = extract_tables(file_path) 
    dfs = {}
    for tabella in tabelle:
        nome_tabella = tabella[0]
        df = pd.read_sql_query(f"SELECT * FROM {nome_tabella}", conn)
        dfs[nome_tabella] = df
    conn.close()
    return dfs

def carica_sqlite(file_path, db_id):
    data_output = {'data_frames': extract_dataframes(file_path),'db':SqliteConnector(relative_db_path=file_path, db_name=db_id)}
    return data_output

# Funzione per leggere un file CSV
def load_csv(file):
    df = pd.read_csv(file)
    return df

# Funzione per leggere un file Excel
def carica_excel(file):
    xls = pd.ExcelFile(file)
    dfs = {}
    for sheet_name in xls.sheet_names:
        dfs[sheet_name] = xls.parse(sheet_name)
    return dfs

def load_data(data_path : str, db_name : str):
    data_output = {'data_frames': {} ,'db': None}
    table_name = os.path.splitext(os.path.basename(data_path))[0]
    if data_path.endswith(".sqlite") :
        data_output = carica_sqlite(data_path, db_name)
    elif data_path.endswith(".csv"):
        data_output['data_frames'] = {f"{table_name}_table" : load_csv(data_path)}
    elif data_path.endswith(".xlsx"):
        data_output['data_frames'] = carica_excel(data_path)
    else:
        raise gr.Error("Formato file non supportato. Carica un file SQLite, CSV o Excel.")    
    return data_output

def read_api(api_key_path):
    with open(api_key_path, "r", encoding="utf-8") as file:
        api_key = file.read()
        return api_key

def read_models_csv(file_path):
    # Reads a CSV file and returns a list of dictionaries
    models = []  # Change {} to []
    with open(file_path, mode="r", newline="") as file:
        reader = csv.DictReader(file)
        for row in reader:
            row["price"] = float(row["price"])  # Convert price to float
            models.append(row)  # Append to the list
    return models

def csv_to_dict(file_path):
    with open(file_path, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        data = []
        for row in reader:
            if "price" in row:
                row["price"] = float(row["price"])
            data.append(row)
    return data


def increment_filename(filename):
    base, ext = os.path.splitext(filename)
    numbers = re.findall(r'\d+', base)
    
    if numbers:
        max_num = max(map(int, numbers)) + 1
        new_base = re.sub(r'(\d+)', lambda m: str(max_num) if int(m.group(1)) == max(map(int, numbers)) else m.group(1), base)
    else:
        new_base = base + '1'
    
    return new_base + ext

def prepare_prompt(prompt, question, schema, samples):
    prompt = prompt.replace("{schema}", schema).replace("{question}", question)
    prompt += f" Some istanze: {samples}"
    return prompt

def generate_some_samples(connector, tbl_name):
    samples = []
    query = f"SELECT * FROM {tbl_name} LIMIT 3"
    try:
        sample_data = connector.execute_query(query)
        samples.append(str(sample_data))
    except Exception as e:
        samples.append(f"Error: {e}")
    return samples
def extract_tables_dict(pnp_path):
    tables_dict = {}
    # df = pd.read_csv(pnp_path)
    # with open(pnp_path, mode='r', encoding='utf-8') as file:
    #     reader = csv.DictReader(file)
    #     for row in reader:
    #         tbl_name = row.get("tbl_name")
    #         db_path = row.get("db_path")
    #         if tbl_name and db_path:
    #             print(db_path, tbl_name)
    #             connector = SqliteConnector(relative_db_path=db_path, db_name=os.path.basename(db_path))
    #             instances = generate_some_samples(connector, tbl_name)
    #             if tbl_name not in tables_dict:
    #                 tables_dict[tbl_name] = []
    #             tables_dict[tbl_name].extend(instances)

    with open(pnp_path, mode='r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        for row in reader:
            tbl_name = row.get("tbl_name")
            if tbl_name not in tables_dict:
                tables_dict[tbl_name] = []
            #tables_dict[tbl_name].append(row)
    return tables_dict

def check_and_create_dir(db_path):
    # Check if the folder exists, and create it if it doesn't
    if not os.path.exists(db_path):
        os.makedirs(db_path)
        print(f"Folder created: {db_path}")
    else:
        print(f"Folder already exists: {db_path}")