import gradio as gr import pandas as pd import aiohttp import asyncio import json import io import os import numpy as np import plotly.express as px import plotly.graph_objects as go from typing import Optional, Tuple, Dict, Any import logging from datetime import datetime import re # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class EnhancedDataAnalyzer: def __init__(self): self.api_base_url = "https://llm.chutes.ai/v1/chat/completions" self.max_file_size = 50 * 1024 * 1024 # 50MB limit self.conversation_history = [] def validate_api_key(self, api_key: str) -> bool: """Validate API key format""" return bool(api_key and len(api_key.strip()) > 10) def validate_file(self, file) -> Tuple[bool, str]: """Validate uploaded file""" if not file: return False, "No file uploaded" file_size = os.path.getsize(file.name) if file_size > self.max_file_size: return False, f"File too large. Maximum size: {self.max_file_size // (1024*1024)}MB" file_extension = os.path.splitext(file.name)[1].lower() if file_extension not in ['.csv', '.xlsx', '.xls']: return False, "Unsupported format. Please upload CSV or Excel files only." return True, "File valid" async def analyze_with_chutes(self, api_token: str, data_summary: str, user_question: str = None) -> str: """Enhanced API call with better error handling and streaming""" headers = { "Authorization": f"Bearer {api_token.strip()}", "Content-Type": "application/json" } # Create context-aware prompt if user_question: prompt = f"""You are a data analyst expert. Based on this dataset: {data_summary} User's specific question: {user_question} Provide a detailed, actionable answer with specific data points and recommendations.""" else: prompt = f"""You are a senior data analyst. Analyze this dataset thoroughly: {data_summary} Provide a comprehensive analysis including: 1. **Key Statistical Insights**: Most important numbers and what they mean 2. **Patterns & Trends**: Notable patterns, correlations, or anomalies 3. **Data Quality Assessment**: Missing values, outliers, data consistency 4. **Business Intelligence**: Actionable insights and opportunities 5. **Recommendations**: Specific next steps or areas to investigate Format your response with clear sections and bullet points for readability.""" body = { "model": "openai/gpt-oss-20b", "messages": [ { "role": "system", "content": "You are an expert data analyst who provides clear, actionable insights from datasets. Always structure your responses with clear headings and specific data points." }, { "role": "user", "content": prompt } ], "stream": True, "max_tokens": 3000, "temperature": 0.2, # Very low for consistent analysis "top_p": 0.9 } try: timeout = aiohttp.ClientTimeout(total=30) # 30 second timeout async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(self.api_base_url, headers=headers, json=body) as response: if response.status == 401: return "❌ **Authentication Error**: Invalid API key. Please check your Chutes API token." elif response.status == 429: return "⏳ **Rate Limit**: Too many requests. Please wait a moment and try again." elif response.status != 200: return f"❌ **API Error**: Request failed with status {response.status}" full_response = "" async for line in response.content: line = line.decode("utf-8").strip() if line.startswith("data: "): data = line[6:] if data == "[DONE]": break try: chunk_data = json.loads(data) if "choices" in chunk_data and len(chunk_data["choices"]) > 0: delta = chunk_data["choices"][0].get("delta", {}) content = delta.get("content", "") if content: full_response += content except json.JSONDecodeError: continue return full_response if full_response else "⚠️ No response received from the model." except asyncio.TimeoutError: return "⏰ **Timeout Error**: Request took too long. Please try again." except Exception as e: logger.error(f"API Error: {str(e)}") return f"❌ **Connection Error**: {str(e)}" def process_file(self, file_path: str) -> Tuple[pd.DataFrame, str, dict]: """Enhanced file processing with better error handling""" try: file_extension = os.path.splitext(file_path)[1].lower() # Read file with better error handling if file_extension == '.csv': # Try different encodings for encoding in ['utf-8', 'latin-1', 'cp1252']: try: df = pd.read_csv(file_path, encoding=encoding) break except UnicodeDecodeError: continue else: raise ValueError("Could not decode CSV file. Please check file encoding.") elif file_extension in ['.xlsx', '.xls']: df = pd.read_excel(file_path) else: raise ValueError("Unsupported file format. Please upload CSV or Excel files.") # Clean column names df.columns = df.columns.str.strip().str.replace(r'\s+', ' ', regex=True) # Generate enhanced summaries data_summary = self.generate_enhanced_summary(df) charts_data = self.generate_chart_data(df) return df, data_summary, charts_data except Exception as e: raise Exception(f"Error processing file: {str(e)}") def generate_enhanced_summary(self, df: pd.DataFrame) -> str: """Generate comprehensive data summary with statistical insights""" summary = [] # Header with timestamp summary.append(f"# 📊 Dataset Analysis Report") summary.append(f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") summary.append(f"**File Size**: {df.shape[0]:,} rows × {df.shape[1]} columns") # Memory usage memory_usage = df.memory_usage(deep=True).sum() / 1024**2 summary.append(f"**Memory Usage**: {memory_usage:.2f} MB\n") # Data types breakdown type_counts = df.dtypes.value_counts() summary.append("## 📋 Column Types:") for dtype, count in type_counts.items(): summary.append(f"- **{dtype}**: {count} columns") # Missing data analysis missing_data = df.isnull().sum() missing_pct = (missing_data / len(df) * 100).round(2) missing_summary = missing_data[missing_data > 0].sort_values(ascending=False) if len(missing_summary) > 0: summary.append("\n## ⚠️ Missing Data:") for col, count in missing_summary.head(10).items(): pct = missing_pct[col] summary.append(f"- **{col}**: {count:,} missing ({pct}%)") else: summary.append("\n## ✅ Data Quality: No missing values detected!") # Numerical analysis numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: summary.append(f"\n## 📈 Numerical Columns Analysis ({len(numeric_cols)} columns):") for col in numeric_cols[:10]: # Limit to first 10 stats = df[col].describe() outliers = len(df[df[col] > (stats['75%'] + 1.5 * (stats['75%'] - stats['25%']))]) summary.append(f"- **{col}**: μ={stats['mean']:.2f}, σ={stats['std']:.2f}, outliers={outliers}") # Categorical analysis categorical_cols = df.select_dtypes(include=['object', 'category']).columns if len(categorical_cols) > 0: summary.append(f"\n## 📝 Categorical Columns Analysis ({len(categorical_cols)} columns):") for col in categorical_cols[:10]: # Limit to first 10 unique_count = df[col].nunique() cardinality = "High" if unique_count > len(df) * 0.9 else "Medium" if unique_count > 10 else "Low" most_common = df[col].mode().iloc[0] if len(df[col].mode()) > 0 else "N/A" summary.append(f"- **{col}**: {unique_count:,} unique values ({cardinality} cardinality), Top: '{most_common}'") # Sample data with better formatting summary.append("\n## 🔍 Data Sample (First 3 Rows):") sample_df = df.head(3) for idx, row in sample_df.iterrows(): summary.append(f"\n**Row {idx + 1}:**") for col, val in row.items(): summary.append(f" - {col}: {val}") return "\n".join(summary) def generate_chart_data(self, df: pd.DataFrame) -> dict: """Generate data for automatic visualizations""" charts = {} # Numerical distribution charts numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: for col in numeric_cols[:3]: # First 3 numeric columns fig = px.histogram(df, x=col, title=f"Distribution of {col}") charts[f"hist_{col}"] = fig # Categorical charts categorical_cols = df.select_dtypes(include=['object', 'category']).columns if len(categorical_cols) > 0: for col in categorical_cols[:2]: # First 2 categorical columns if df[col].nunique() <= 20: # Only if reasonable number of categories value_counts = df[col].value_counts().head(10) fig = px.bar(x=value_counts.index, y=value_counts.values, title=f"Top Values in {col}") charts[f"bar_{col}"] = fig return charts # Initialize the analyzer analyzer = EnhancedDataAnalyzer() async def analyze_data(file, api_key, user_question="", progress=gr.Progress()): """Enhanced analysis function with progress tracking""" if not file: return "❌ Please upload a CSV or Excel file.", "", "", None if not analyzer.validate_api_key(api_key): return "❌ Please enter a valid Chutes API key (minimum 10 characters).", "", "", None # Validate file is_valid, validation_msg = analyzer.validate_file(file) if not is_valid: return f"❌ {validation_msg}", "", "", None progress(0.1, desc="📁 Reading file...") try: # Process the uploaded file df, data_summary, charts_data = analyzer.process_file(file.name) progress(0.3, desc="📊 Processing data...") # Generate visualizations chart_html = create_basic_charts(df) progress(0.5, desc="🤖 Generating AI insights...") # Get AI analysis ai_analysis = await analyzer.analyze_with_chutes(api_key, data_summary, user_question) progress(0.9, desc="✨ Finalizing results...") # Format the complete response response = f"""# 🎯 Analysis Complete! {ai_analysis} --- *Analysis powered by OpenAI gpt-oss-20b via Chutes • Generated at {datetime.now().strftime('%H:%M:%S')}* """ progress(1.0, desc="✅ Done!") return response, data_summary, df.head(15).to_html(classes="table table-striped"), chart_html except Exception as e: logger.error(f"Analysis error: {str(e)}") return f"❌ **Error**: {str(e)}", "", "", None def create_basic_charts(df: pd.DataFrame) -> str: """Create basic visualizations for the dataset""" charts_html = [] try: # Chart 1: Data completeness heatmap missing_data = df.isnull().sum() if missing_data.sum() > 0: fig = px.bar(x=missing_data.index, y=missing_data.values, title="Missing Data by Column", labels={'x': 'Columns', 'y': 'Missing Count'}) fig.update_layout(height=400, showlegend=False) charts_html.append(fig.to_html(include_plotlyjs='cdn')) # Chart 2: Numerical columns correlation (if multiple numeric columns) numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 1: corr_matrix = df[numeric_cols].corr() fig = px.imshow(corr_matrix, title="Correlation Matrix", color_continuous_scale='RdBu_r', aspect="auto") fig.update_layout(height=500) charts_html.append(fig.to_html(include_plotlyjs='cdn')) # Chart 3: Distribution of first numeric column if len(numeric_cols) > 0: first_numeric = numeric_cols[0] fig = px.histogram(df, x=first_numeric, title=f"Distribution: {first_numeric}", marginal="box") fig.update_layout(height=400) charts_html.append(fig.to_html(include_plotlyjs='cdn')) return "\n".join(charts_html) if charts_html else "
No charts generated for this dataset.
" except Exception as e: logger.error(f"Chart generation error: {str(e)}") return f"Chart generation failed: {str(e)}
" def sync_analyze_data(file, api_key, user_question="", progress=gr.Progress()): """Synchronous wrapper for the async analyze function""" return asyncio.run(analyze_data(file, api_key, user_question, progress)) def clear_all(): """Clear all inputs and outputs""" return None, "", "", "", "", "", None def download_summary(analysis_text, data_summary): """Generate downloadable summary report""" if not analysis_text: return None report = f"""# Data Analysis Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ## AI Analysis: {analysis_text} ## Raw Data Summary: {data_summary} """ # Save to temporary file filename = f"data_analysis_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" with open(filename, 'w', encoding='utf-8') as f: f.write(report) return filename # Create enhanced Gradio interface with gr.Blocks( title="🚀 Smart Data Analyzer Pro", theme=gr.themes.Ocean(), css=""" .gradio-container { font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; } .tab-nav { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); } .upload-area { border: 2px dashed #667eea; border-radius: 10px; padding: 20px; text-align: center; background: #f8f9ff; } """ ) as app: # Header gr.Markdown(""" # 🚀 Smart Data Analyzer Pro ### AI-Powered Excel & CSV Analysis with OpenAI gpt-oss-20b Upload your data files and get instant professional insights, visualizations, and recommendations! """) # Main interface with gr.Row(): with gr.Column(scale=1): # Configuration section gr.Markdown("### ⚙️ Configuration") api_key_input = gr.Textbox( label="🔑 Chutes API Key", placeholder="sk-chutes-your-api-key-here...", type="password", lines=1, info="Get your free API key from chutes.ai" ) file_input = gr.File( label="📁 Upload Data File", file_types=[".csv", ".xlsx", ".xls"], file_count="single", elem_classes=["upload-area"] ) with gr.Row(): analyze_btn = gr.Button("🚀 Analyze Data", variant="primary", size="lg") clear_btn = gr.Button("🗑️ Clear All", variant="secondary") # Quick stats display with gr.Group(): gr.Markdown("### 📊 Quick Stats") file_stats = gr.Textbox( label="File Information", lines=3, interactive=False, placeholder="Upload a file to see statistics..." ) with gr.Column(scale=2): # Results section gr.Markdown("### 🎯 Analysis Results") analysis_output = gr.Markdown( value="📋 **Ready to analyze your data!**\n\nUpload a CSV or Excel file and click 'Analyze Data' to get started.", show_label=False ) # Advanced features in tabs with gr.Tabs(): with gr.Tab("💬 Ask Questions"): question_input = gr.Textbox( label="❓ Ask Specific Questions About Your Data", placeholder="Examples:\n• What are the top 5 customers by revenue?\n• Are there any seasonal trends?\n• Which products have the highest margins?\n• What anomalies do you see in this data?", lines=3 ) ask_btn = gr.Button("🔍 Get Answer", variant="primary") question_output = gr.Markdown() with gr.Tab("📊 Data Preview"): data_preview = gr.HTML( label="Dataset Preview", value="Upload a file to see data preview...
" ) with gr.Tab("📈 Visualizations"): charts_output = gr.HTML( label="Auto-Generated Charts", value="Charts will appear here after analysis...
" ) with gr.Tab("🔍 Raw Summary"): raw_summary = gr.Textbox( label="Detailed Data Summary", lines=15, max_lines=20, show_copy_button=True ) with gr.Tab("💾 Export"): gr.Markdown("### Download Your Analysis Report") download_btn = gr.Button("📥 Download Report (.md)", variant="secondary") download_file = gr.File(label="Download Link", visible=False) # Event handlers def update_file_stats(file): if not file: return "No file uploaded" try: file_size = os.path.getsize(file.name) / (1024 * 1024) # MB file_name = os.path.basename(file.name) return f"📄 **File**: {file_name}\n📏 **Size**: {file_size:.2f} MB\n⏰ **Uploaded**: {datetime.now().strftime('%H:%M:%S')}" except: return "File information unavailable" # Main analysis analyze_btn.click( fn=sync_analyze_data, inputs=[file_input, api_key_input, gr.Textbox(value="", visible=False)], outputs=[analysis_output, raw_summary, data_preview, charts_output], show_progress=True ) # Follow-up questions ask_btn.click( fn=sync_analyze_data, inputs=[file_input, api_key_input, question_input], outputs=[question_output, gr.Textbox(visible=False), gr.HTML(visible=False), gr.HTML(visible=False)], show_progress=True ) # File stats update file_input.change( fn=update_file_stats, inputs=[file_input], outputs=[file_stats] ) # Clear functionality clear_btn.click( fn=clear_all, outputs=[file_input, api_key_input, question_input, analysis_output, question_output, data_preview, charts_output] ) # Download functionality download_btn.click( fn=download_summary, inputs=[analysis_output, raw_summary], outputs=[download_file] ) # Footer with usage tips gr.Markdown(""" --- ### 💡 Pro Tips for Better Analysis: **🎯 For Best Results:** - Clean your data before upload (remove extra headers, format dates consistently) - Use descriptive column names - Ask specific questions like "What drives the highest profits?" instead of "Analyze this data" **⚡ Speed Optimization:** - Files under 10MB process fastest - CSV files typically load faster than Excel - Limit to essential columns for quicker analysis **🔧 Supported Formats:** CSV, XLSX, XLS | **📏 Max Size:** 50MB | **🚀 Response Time:** ~3-5 seconds """) # Launch configuration if __name__ == "__main__": app.queue(max_size=10) # Handle multiple users app.launch( share=True )