shukdevdattaEX's picture
Update v2.txt
dd6870b verified
import gradio as gr
import pandas as pd
import aiohttp
import asyncio
import json
import os
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from typing import Optional, Tuple, Dict, Any
import logging
from datetime import datetime
import re
from jinja2 import Template
import markdown # Requires 'markdown' package: install via `pip install markdown`
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EnhancedDataAnalyzer:
def __init__(self):
self.api_base_url = "https://llm.chutes.ai/v1/chat/completions"
self.max_file_size = 50 * 1024 * 1024 # 50MB limit
self.conversation_history = []
self.current_df = None
self.current_charts = None
def validate_api_key(self, api_key: str) -> bool:
"""Validate API key format"""
return bool(api_key and len(api_key.strip()) > 10)
def validate_file(self, file) -> Tuple[bool, str]:
"""Validate uploaded file"""
if not file:
return False, "No file uploaded"
file_size = os.path.getsize(file.name)
if file_size > self.max_file_size:
return False, f"File too large. Maximum size: {self.max_file_size // (1024*1024)}MB"
file_extension = os.path.splitext(file.name)[1].lower()
if file_extension not in ['.csv', '.xlsx', '.xls']:
return False, "Unsupported format. Please upload CSV or Excel files only."
return True, "File valid"
async def analyze_with_chutes(self, api_token: str, data_summary: str, user_question: str = None) -> str:
"""Enhanced API call with better error handling and streaming"""
headers = {
"Authorization": f"Bearer {api_token.strip()}",
"Content-Type": "application/json"
}
# Create context-aware prompt
if user_question:
prompt = f"""You are a data analyst expert. Based on this dataset:
{data_summary}
User's specific question: {user_question}
Provide a detailed, actionable answer with specific data points and recommendations."""
else:
prompt = f"""You are a senior data analyst. Analyze this dataset thoroughly:
{data_summary}
Provide a comprehensive analysis including:
1. **Key Statistical Insights**: Most important numbers and what they mean
2. **Patterns & Trends**: Notable patterns, correlations, or anomalies
3. **Data Quality Assessment**: Missing values, outliers, data consistency
4. **Business Intelligence**: Actionable insights and opportunities
5. **Recommendations**: Specific next steps or areas to investigate
Format your response with clear sections and bullet points for readability."""
body = {
"model": "openai/gpt-oss-20b",
"messages": [
{
"role": "system",
"content": "You are an expert data analyst who provides clear, actionable insights from datasets. Always structure your responses with clear headings and specific data points."
},
{
"role": "user",
"content": prompt
}
],
"stream": True,
"max_tokens": 3000,
"temperature": 0.2,
"top_p": 0.9
}
try:
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(self.api_base_url, headers=headers, json=body) as response:
if response.status == 401:
return "❌ **Authentication Error**: Invalid API key. Please check your Chutes API token."
elif response.status == 429:
return "⏳ **Rate Limit**: Too many requests. Please wait a moment and try again."
elif response.status != 200:
return f"❌ **API Error**: Request failed with status {response.status}"
full_response = ""
async for line in response.content:
line = line.decode("utf-8").strip()
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk_data = json.loads(data)
if "choices" in chunk_data and len(chunk_data["choices"]) > 0:
delta = chunk_data["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
full_response += content
except json.JSONDecodeError:
continue
return full_response if full_response else "⚠️ No response received from the model."
except asyncio.TimeoutError:
return "⏰ **Timeout Error**: Request took too long. Please try again."
except Exception as e:
logger.error(f"API Error: {str(e)}")
return f"❌ **Connection Error**: {str(e)}"
def process_file(self, file_path: str) -> Tuple[pd.DataFrame, str, str]:
"""Enhanced file processing with better error handling"""
try:
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension == '.csv':
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
df = pd.read_csv(file_path, encoding=encoding)
break
except UnicodeDecodeError:
continue
else:
raise ValueError("Could not decode CSV file. Please check file encoding.")
elif file_extension in ['.xlsx', '.xls']:
df = pd.read_excel(file_path)
else:
raise ValueError("Unsupported file format. Please upload CSV or Excel files.")
df.columns = df.columns.str.strip().str.replace(r'\s+', ' ', regex=True)
self.current_df = df
data_summary = self.generate_enhanced_summary(df)
charts_html = self.generate_visualizations(df)
return df, data_summary, charts_html
except Exception as e:
raise Exception(f"Error processing file: {str(e)}")
def generate_enhanced_summary(self, df: pd.DataFrame) -> str:
"""Generate comprehensive data summary with statistical insights"""
summary = []
summary.append(f"# πŸ“Š Dataset Analysis Report")
summary.append(f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
summary.append(f"**File Size**: {df.shape[0]:,} rows Γ— {df.shape[1]} columns")
memory_usage = df.memory_usage(deep=True).sum() / 1024**2
summary.append(f"**Memory Usage**: {memory_usage:.2f} MB\n")
type_counts = df.dtypes.value_counts()
summary.append("## πŸ“‹ Column Types:")
for dtype, count in type_counts.items():
summary.append(f"- **{dtype}**: {count} columns")
missing_data = df.isnull().sum()
missing_pct = (missing_data / len(df) * 100).round(2)
missing_summary = missing_data[missing_data > 0].sort_values(ascending=False)
if len(missing_summary) > 0:
summary.append("\n## ⚠️ Missing Data:")
for col, count in missing_summary.head(10).items():
pct = missing_pct[col]
summary.append(f"- **{col}**: {count:,} missing ({pct}%)")
else:
summary.append("\n## βœ… Data Quality: No missing values detected!")
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 0:
summary.append(f"\n## πŸ“ˆ Numerical Columns Analysis ({len(numeric_cols)} columns):")
for col in numeric_cols[:10]:
stats = df[col].describe()
outliers = len(df[df[col] > (stats['75%'] + 1.5 * (stats['75%'] - stats['25%']))])
summary.append(f"- **{col}**: ΞΌ={stats['mean']:.2f}, Οƒ={stats['std']:.2f}, outliers={outliers}")
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
summary.append(f"\n## πŸ“ Categorical Columns Analysis ({len(categorical_cols)} columns):")
for col in categorical_cols[:10]:
unique_count = df[col].nunique()
cardinality = "High" if unique_count > len(df) * 0.9 else "Medium" if unique_count > 10 else "Low"
most_common = df[col].mode().iloc[0] if len(df[col].mode()) > 0 else "N/A"
summary.append(f"- **{col}**: {unique_count:,} unique values ({cardinality} cardinality), Top: '{most_common}'")
summary.append("\n## πŸ” Data Sample (First 3 Rows):")
sample_df = df.head(3)
for idx, row in sample_df.iterrows():
summary.append(f"\n**Row {idx + 1}:**")
for col, val in row.items():
summary.append(f" - {col}: {val}")
return "\n".join(summary)
def generate_visualizations(self, df: pd.DataFrame) -> str:
"""Generate comprehensive visualizations for the dataset"""
charts_html = []
try:
missing_data = df.isnull().sum()
if missing_data.sum() > 0:
fig = px.bar(
x=missing_data.index,
y=missing_data.values,
title="πŸ” Missing Data Analysis",
labels={'x': 'Columns', 'y': 'Missing Values Count'},
color=missing_data.values,
color_continuous_scale='Reds'
)
fig.update_layout(
height=400,
showlegend=False,
title_x=0.5,
xaxis_tickangle=-45
)
charts_html.append(f"<h3>πŸ“Š Data Quality Overview</h3>")
charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="missing_data_chart"))
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) > 1:
corr_matrix = df[numeric_cols].corr()
fig = px.imshow(
corr_matrix,
title="πŸ”— Correlation Matrix - Numerical Variables",
color_continuous_scale='RdBu_r',
aspect="auto",
text_auto=True
)
fig.update_layout(height=500, title_x=0.5)
charts_html.append(f"<h3>πŸ“ˆ Correlation Analysis</h3>")
charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="correlation_chart"))
if len(numeric_cols) > 0:
for i, col in enumerate(numeric_cols[:3]):
fig = px.histogram(
df,
x=col,
title=f"πŸ“Š Distribution: {col}",
marginal="box",
nbins=30
)
fig.update_layout(height=400, title_x=0.5)
if i == 0:
charts_html.append(f"<h3>πŸ“ˆ Data Distributions</h3>")
charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id=f"dist_chart_{i}"))
categorical_cols = df.select_dtypes(include=['object', 'category']).columns
if len(categorical_cols) > 0:
for i, col in enumerate(categorical_cols[:2]):
if df[col].nunique() <= 20:
value_counts = df[col].value_counts().head(10)
fig = px.bar(
x=value_counts.values,
y=value_counts.index,
orientation='h',
title=f"πŸ“Š Top 10 Values: {col}",
labels={'x': 'Count', 'y': col}
)
fig.update_layout(height=400, title_x=0.5)
if i == 0:
charts_html.append(f"<h3>πŸ“ Categorical Data Analysis</h3>")
charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id=f"cat_chart_{i}"))
summary_data = {
'Metric': ['Total Rows', 'Total Columns', 'Numeric Columns', 'Categorical Columns', 'Missing Values'],
'Count': [
len(df),
len(df.columns),
len(numeric_cols),
len(categorical_cols),
df.isnull().sum().sum()
]
}
fig = px.bar(
summary_data,
x='Metric',
y='Count',
title="πŸ“‹ Dataset Overview",
color='Count',
color_continuous_scale='Blues'
)
fig.update_layout(height=400, title_x=0.5, showlegend=False)
charts_html.append(f"<h3>πŸ“Š Dataset Overview</h3>")
charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="overview_chart"))
self.current_charts = charts_html
return "\n".join(charts_html) if charts_html else "<p>No charts could be generated for this dataset.</p>"
except Exception as e:
logger.error(f"Chart generation error: {str(e)}")
return f"<p>❌ Chart generation failed: {str(e)}</p>"
def generate_report_html(self, analysis_text: str, data_summary: str, file_name: str = "Unknown") -> str:
"""Generate HTML report with properly formatted text and print button"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Data Analysis Report</title>
<style>
body {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
line-height: 1.6;
color: #333;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background: #f8f9fa;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
border-radius: 10px;
margin-bottom: 30px;
text-align: center;
}
.section {
background: white;
padding: 25px;
margin-bottom: 20px;
border-radius: 8px;
box-shadow: 0 2px 10px rgba(0,0,0,0.1);
}
.chart-container {
margin: 20px 0;
padding: 15px;
background: #f8f9ff;
border-radius: 8px;
border-left: 4px solid #667eea;
}
h1, h2, h3 {
color: #2c3e50;
margin-top: 20px;
margin-bottom: 15px;
}
.metadata {
background: #e8f4f8;
padding: 15px;
border-radius: 5px;
margin-bottom: 20px;
}
.footer {
text-align: center;
color: #666;
margin-top: 40px;
padding: 20px;
background: #f1f1f1;
border-radius: 5px;
}
pre {
background: #f4f4f4;
padding: 15px;
border-radius: 5px;
overflow-x: auto;
white-space: pre-wrap;
font-size: 14px;
}
strong {
color: #2c3e50;
font-weight: 600;
}
table {
width: 100%;
border-collapse: collapse;
margin: 20px 0;
}
th, td {
border: 1px solid #ddd;
padding: 8px;
text-align: left;
}
th {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
}
tr:nth-child(even) {
background-color: #f2f2f2;
}
.print-button {
background: #667eea;
color: white;
padding: 10px 20px;
border: none;
border-radius: 5px;
cursor: pointer;
font-size: 16px;
margin: 10px 0;
display: inline-block;
}
.print-button:hover {
background: #764ba2;
}
@media print {
.print-button {
display: none;
}
body {
background: white;
}
.section, .metadata, .footer {
box-shadow: none;
}
}
</style>
<script>
function printReport() {
window.print();
}
</script>
</head>
<body>
<div class="header">
<h1>πŸš€ Smart Data Analysis Report</h1>
<p>Comprehensive AI-Powered Data Insights</p>
</div>
<div class="metadata">
<strong>πŸ“ File:</strong> {{ file_name }}<br>
<strong>πŸ“… Generated:</strong> {{ timestamp }}<br>
<strong>πŸ€– Model:</strong> OpenAI gpt-oss-20b
</div>
<div class="section">
<h2>🎯 AI Analysis & Insights</h2>
<button class="print-button" onclick="printReport()">πŸ–¨οΈ Print as PDF</button>
<div>{{ ai_analysis }}</div>
</div>
<div class="section">
<h2>πŸ“Š Visualizations</h2>
<div class="chart-container">
{{ charts_html }}
</div>
</div>
<div class="section">
<h2>πŸ“‹ Technical Data Summary</h2>
<pre>{{ data_summary }}</pre>
</div>
<div class="footer">
<p>Report generated by Smart Data Analyzer Pro β€’ Powered by Smart AI</p>
<p>For questions or support, contact +8801719296601 (via Whatsapp)</p>
</div>
</body>
</html>
"""
template = Template(html_template)
ai_analysis_html = markdown.markdown(analysis_text, extensions=['extra', 'tables'])
charts_content = "\n".join(self.current_charts) if self.current_charts else "<p>No visualizations available</p>"
return template.render(
file_name=file_name,
timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
ai_analysis=ai_analysis_html,
charts_html=charts_content,
data_summary=data_summary
)
analyzer = EnhancedDataAnalyzer()
async def analyze_data(file, api_key, user_question="", progress=gr.Progress()):
if not file:
return "❌ Please upload a CSV or Excel file.", "", "", "", None
if not analyzer.validate_api_key(api_key):
return "❌ Please enter a valid Chutes API key (minimum 10 characters).", "", "", "", None
is_valid, validation_msg = analyzer.validate_file(file)
if not is_valid:
return f"❌ {validation_msg}", "", "", "", None
progress(0.1, desc="πŸ“ Reading file...")
try:
df, data_summary, charts_html = analyzer.process_file(file.name)
progress(0.3, desc="πŸ“Š Processing data...")
progress(0.5, desc="πŸ€– Generating AI insights...")
ai_analysis = await analyzer.analyze_with_chutes(api_key, data_summary, user_question)
progress(0.9, desc="✨ Finalizing results...")
response = f"""# 🎯 Analysis Complete!
{ai_analysis}
---
*Analysis powered by OpenAI gpt-oss-20b via Chutes β€’ Generated at {datetime.now().strftime('%H:%M:%S')}*
"""
data_preview_html = df.head(15).to_html(
classes="table table-striped table-hover",
table_id="data-preview-table",
escape=False
)
styled_preview = f"""
<style>
#data-preview-table {{
width: 100%;
border-collapse: collapse;
margin: 20px 0;
font-size: 14px;
}}
#data-preview-table th {{
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 12px 8px;
text-align: left;
font-weight: bold;
}}
#data-preview-table td {{
padding: 10px 8px;
border-bottom: 1px solid #ddd;
}}
#data-preview-table tr:hover {{
background-color: #f5f5f5;
}}
</style>
{data_preview_html}
"""
progress(1.0, desc="βœ… Done!")
return response, data_summary, styled_preview, charts_html, file.name
except Exception as e:
logger.error(f"Analysis error: {str(e)}")
return f"❌ **Error**: {str(e)}", "", "", "", None
def sync_analyze_data(file, api_key, user_question="", progress=gr.Progress()):
return asyncio.run(analyze_data(file, api_key, user_question, progress))
def clear_all():
analyzer.current_df = None
analyzer.current_charts = None
return None, "", "", "", "", "", "", None
def download_report(analysis_text, data_summary, file_name, format_choice):
if not analysis_text:
return None, "❌ No analysis data available for download."
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
file_base_name = os.path.splitext(file_name)[0] if file_name else "data_analysis"
try:
if format_choice == "HTML":
html_content = analyzer.generate_report_html(analysis_text, data_summary, file_name)
filename = f"{file_base_name}_analysis_report_{timestamp}.html"
with open(filename, 'w', encoding='utf-8') as f:
f.write(html_content)
return filename, f"βœ… HTML report generated successfully! File: {filename}"
else: # Markdown
report = f"""# Data Analysis Report
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
File: {file_name}
## AI Analysis:
{analysis_text}
## Raw Data Summary:
{data_summary}
"""
filename = f"{file_base_name}_analysis_report_{timestamp}.md"
with open(filename, 'w', encoding='utf-8') as f:
f.write(report)
return filename, f"βœ… Markdown report generated successfully! File: {filename}"
except Exception as e:
logger.error(f"Report generation error: {str(e)}")
return None, f"❌ Error generating report: {str(e)}"
with gr.Blocks(
title="πŸš€ Smart Data Analyzer Pro",
theme=gr.themes.Ocean(),
css="""
.gradio-container {
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.tab-nav {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
}
.upload-area {
border: 2px dashed #667eea;
border-radius: 10px;
padding: 20px;
text-align: center;
background: #f8f9ff;
}
"""
) as app:
current_file_name = gr.State("")
gr.Markdown("""
# πŸš€ Smart Data Analyzer Pro
### AI-Powered Excel & CSV Analysis with OpenAI gpt-oss-20b
Upload your data files and get instant professional insights and downloadable reports!
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### βš™οΈ Configuration")
api_key_input = gr.Textbox(
label="πŸ”‘ Chutes API Key",
placeholder="sk-chutes-your-api-key-here...",
type="password",
lines=1,
info="Get your free API key from chutes.ai"
)
file_input = gr.File(
label="πŸ“ Upload Data File",
file_types=[".csv", ".xlsx", ".xls"],
file_count="single",
elem_classes=["upload-area"]
)
with gr.Row():
analyze_btn = gr.Button("πŸš€ Analyze Data", variant="primary", size="lg")
clear_btn = gr.Button("πŸ—‘οΈ Clear All", variant="secondary")
with gr.Group():
gr.Markdown("### πŸ“Š Quick Stats")
file_stats = gr.Textbox(
label="File Information",
lines=3,
interactive=False,
placeholder="Upload a file to see statistics..."
)
with gr.Column(scale=2):
gr.Markdown("### 🎯 Analysis Results")
analysis_output = gr.Markdown(
value="πŸ“‹ **Ready to analyze your data!**\n\nUpload a CSV or Excel file and click 'Analyze Data' to get started.",
show_label=False
)
with gr.Tabs():
with gr.Tab("πŸ’¬ Ask Questions"):
question_input = gr.Textbox(
label="❓ Ask Specific Questions About Your Data",
placeholder="Examples:\nβ€’ What are the top 5 customers by revenue?\nβ€’ Are there any seasonal trends?\nβ€’ Which products have the highest margins?\nβ€’ What anomalies do you see in this data?",
lines=3
)
ask_btn = gr.Button("πŸ” Get Answer", variant="primary")
question_output = gr.Markdown()
with gr.Tab("πŸ“Š Data Preview"):
data_preview = gr.HTML(
label="Dataset Preview",
value="<p>Upload a file to see data preview...</p>"
)
with gr.Tab("πŸ” Raw Summary"):
raw_summary = gr.Textbox(
label="Detailed Data Summary",
lines=15,
max_lines=20,
show_copy_button=True
)
with gr.Tab("πŸ’Ύ Export Reports"):
gr.Markdown("### πŸ“₯ Download Your Analysis Report")
with gr.Row():
format_choice = gr.Radio(
choices=["HTML", "Markdown"],
value="HTML",
label="πŸ“„ Report Format",
info="Choose your preferred download format"
)
download_btn = gr.Button("πŸ“₯ Generate & Download Report", variant="primary", size="lg")
download_status = gr.Textbox(label="Download Status", interactive=False)
download_file = gr.File(label="πŸ“„ Download Link", visible=True)
def update_file_stats(file):
if not file:
return "No file uploaded"
try:
file_size = os.path.getsize(file.name) / (1024 * 1024)
file_name = os.path.basename(file.name)
return f"πŸ“„ **File**: {file_name}\nπŸ“ **Size**: {file_size:.2f} MB\n⏰ **Uploaded**: {datetime.now().strftime('%H:%M:%S')}"
except:
return "File information unavailable"
def handle_analysis(file, api_key, user_question="", progress=gr.Progress()):
result = sync_analyze_data(file, api_key, user_question, progress)
if len(result) == 5:
return result[0], result[1], result[2], result[4]
else:
return result[0], result[1], result[2], ""
def handle_question_analysis(file, api_key, question, progress=gr.Progress()):
if not question.strip():
return "❓ Please enter a specific question about your data."
result = sync_analyze_data(file, api_key, question, progress)
return result[0]
analyze_btn.click(
fn=handle_analysis,
inputs=[file_input, api_key_input, gr.Textbox(value="", visible=False)],
outputs=[analysis_output, raw_summary, data_preview, current_file_name],
show_progress=True
)
ask_btn.click(
fn=handle_question_analysis,
inputs=[file_input, api_key_input, question_input],
outputs=[question_output],
show_progress=True
)
file_input.change(
fn=update_file_stats,
inputs=[file_input],
outputs=[file_stats]
)
clear_btn.click(
fn=clear_all,
outputs=[file_input, api_key_input, question_input, analysis_output,
question_output, data_preview, raw_summary, current_file_name]
)
download_btn.click(
fn=download_report,
inputs=[analysis_output, raw_summary, current_file_name, format_choice],
outputs=[download_file, download_status]
)
gr.Markdown("""
---
### πŸ’‘ Pro Tips for Better Analysis:
**🎯 For Best Results:**
- Clean your data before upload (remove extra headers, format dates consistently)
- Use descriptive column names
- Ask specific questions like "What drives the highest profits?" instead of "Analyze this data"
**πŸ“₯ Export Options:**
- **HTML**: Interactive report with embedded charts and print-to-PDF option
- **Markdown**: Simple text format for documentation
**⚑ Speed Optimization:**
- Files under 10MB process fastest
- CSV files typically load faster than Excel
- Limit to essential columns for quicker analysis
**πŸ”§ Supported Formats:** CSV, XLSX, XLS | **πŸ“ Max Size:** 50MB | **πŸš€ Response Time:** ~3-5 seconds
""")
if __name__ == "__main__":
app.queue(max_size=10)
app.launch()