Spaces:
Build error
Build error
import os | |
import gradio as gr | |
from sqlalchemy import text | |
from smolagents import CodeAgent, HfApiModel | |
import pandas as pd | |
from io import StringIO | |
import tempfile | |
from database import ( | |
engine, | |
create_dynamic_table, | |
clear_database, | |
insert_rows_into_table | |
) | |
# Initialize the AI agent | |
agent = CodeAgent( | |
tools=[], | |
model=HfApiModel(model_id="Qwen/Qwen2.5-Coder-32B-Instruct"), | |
) | |
def get_data_table(): | |
"""Fetch and return the current table data as DataFrame""" | |
try: | |
with engine.connect() as con: | |
tables = con.execute(text( | |
"SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'" | |
)).fetchall() | |
if not tables: | |
return pd.DataFrame() | |
table_name = tables[0][0] | |
with engine.connect() as con: | |
result = con.execute(text(f"SELECT * FROM {table_name}")) | |
rows = result.fetchall() | |
columns = result.keys() | |
return pd.DataFrame(rows, columns=columns) if rows else pd.DataFrame() | |
except Exception as e: | |
return pd.DataFrame({"Error": [str(e)]}) | |
def process_txt_file(file_path): | |
"""Analyze text file and convert to structured table""" | |
try: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
structure_prompt = f""" | |
Convert this text into valid CSV format: | |
{content} | |
Requirements: | |
1. First row must be headers | |
2. Consistent columns per row | |
3. Quote fields containing commas | |
4. Maintain original data relationships | |
Return ONLY the CSV content. | |
""" | |
csv_output = agent.run(structure_prompt) | |
try: | |
df = pd.read_csv( | |
StringIO(csv_output), | |
on_bad_lines='warn', | |
dtype=str, | |
encoding_errors='ignore' | |
).dropna(how='all') | |
except pd.errors.ParserError as pe: | |
return False, f"CSV Parsing Error: {str(pe)}", pd.DataFrame() | |
if df.empty or len(df.columns) == 0: | |
return False, "No structured data found", pd.DataFrame() | |
clear_database() | |
table = create_dynamic_table(df) | |
insert_rows_into_table(df.to_dict('records'), table) | |
return True, "Text analyzed successfully!", df.head(10) | |
except Exception as e: | |
return False, f"Processing error: {str(e)}", pd.DataFrame() | |
def handle_upload(file_obj): | |
"""Handle file upload and processing""" | |
if file_obj is None: | |
return [ | |
"Please upload a text file.", | |
None, | |
"No schema", | |
gr.update(visible=True), | |
gr.update(visible=False), | |
gr.update(visible=False) | |
] | |
success, message, df = process_txt_file(file_obj) | |
if success: | |
schema = "\n".join([f"- {col} (text)" for col in df.columns]) | |
return [ | |
message, | |
df, | |
f"### Detected Schema:\n```\n{schema}\n```", | |
gr.update(visible=False), | |
gr.update(visible=True), | |
gr.update(visible=True) | |
] | |
return [ | |
message, | |
None, | |
"No schema", | |
gr.update(visible=True), | |
gr.update(visible=False), | |
gr.update(visible=False) | |
] | |
def query_analysis(user_query: str) -> str: | |
"""Handle natural language queries about the data""" | |
try: | |
df = get_data_table() | |
if df.empty: | |
return "Please upload and process a file first." | |
analysis_prompt = f""" | |
Analyze this data: | |
{df.head().to_csv()} | |
Question: {user_query} | |
Provide: | |
1. Direct answer | |
2. Numerical formatting | |
3. Data references | |
Use Markdown formatting. | |
""" | |
return agent.run(analysis_prompt) | |
except Exception as e: | |
return f"Query error: {str(e)}" | |
def download_csv(): | |
"""Generate CSV file for download""" | |
df = get_data_table() | |
if not df.empty: | |
temp_dir = tempfile.gettempdir() | |
file_path = os.path.join(temp_dir, "processed_data.csv") | |
df.to_csv(file_path, index=False) | |
return file_path | |
return None | |
# Gradio interface setup | |
with gr.Blocks() as demo: | |
with gr.Group() as upload_group: | |
gr.Markdown(""" | |
# Text Data Analyzer | |
Upload unstructured text files to analyze and query their data | |
""") | |
file_input = gr.File( | |
label="Upload Text File", | |
file_types=[".txt"], | |
type="filepath" | |
) | |
status = gr.Textbox(label="Processing Status", interactive=False) | |
with gr.Group(visible=False) as query_group: | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Row(): | |
user_input = gr.Textbox(label="Ask about the data", scale=4) | |
submit_btn = gr.Button("Submit", scale=1) | |
query_output = gr.Markdown(label="Analysis Results") | |
with gr.Column(scale=2): | |
gr.Markdown("### Extracted Data Preview") | |
data_table = gr.Dataframe( | |
label="Structured Data", | |
interactive=False | |
) | |
download_btn = gr.DownloadButton( | |
"Download as CSV", | |
visible=False | |
) | |
schema_display = gr.Markdown() | |
refresh_btn = gr.Button("Refresh View") | |
# Event handlers | |
file_input.upload( | |
fn=handle_upload, | |
inputs=file_input, | |
outputs=[status, data_table, schema_display, upload_group, query_group, download_btn] | |
) | |
submit_btn.click( | |
fn=query_analysis, | |
inputs=user_input, | |
outputs=query_output | |
) | |
user_input.submit( | |
fn=query_analysis, | |
inputs=user_input, | |
outputs=query_output | |
) | |
refresh_btn.click( | |
fn=lambda: (get_data_table().head(10), "Schema refreshed"), | |
outputs=[data_table, schema_display] | |
) | |
download_btn.click( | |
fn=download_csv, | |
outputs=download_btn | |
) | |
if __name__ == "__main__": | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |