import gradio as gr # import pandas as pd import aiohttp import asyncio import json import os import numpy as np import plotly.express as px import plotly.graph_objects as go from plotly.subplots import make_subplots from typing import Optional, Tuple, Dict, Any, List import logging from datetime import datetime, timedelta import re from jinja2 import Template import markdown import zipfile import io import base64 from scipy import stats import seaborn as sns import warnings warnings.filterwarnings('ignore') # Configure logging with better formatting logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) class AdvancedDataAnalyzer: def __init__(self): self.api_base_url = "https://llm.chutes.ai/v1/chat/completions" self.max_file_size = 100 * 1024 * 1024 # Increased to 100MB self.conversation_history = [] self.current_df = None self.current_charts = None self.analysis_cache = {} self.supported_formats = ['.csv', '.xlsx', '.xls', '.json', '.parquet', '.tsv'] def validate_api_key(self, api_key: str) -> Tuple[bool, str]: """Enhanced API key validation""" if not api_key or len(api_key.strip()) < 10: return False, "API key must be at least 10 characters long" # Check for common API key patterns api_key = api_key.strip() if not (api_key.startswith(('sk-', 'pk-', 'Bearer ')) or len(api_key) > 20): return False, "API key format appears invalid" return True, "Valid API key format" def validate_file(self, file) -> Tuple[bool, str]: """Enhanced file validation with better error messages""" if not file: return False, "No file uploaded" try: file_size = os.path.getsize(file.name) if file_size > self.max_file_size: return False, f"File too large. Maximum size: {self.max_file_size // (1024*1024)}MB" if file_size == 0: return False, "File is empty" file_extension = os.path.splitext(file.name)[1].lower() if file_extension not in self.supported_formats: return False, f"Unsupported format. Supported: {', '.join(self.supported_formats)}" return True, "File validation passed" except Exception as e: return False, f"File validation error: {str(e)}" async def analyze_with_chutes(self, api_token: str, data_summary: str, user_question: str = None, analysis_type: str = "comprehensive") -> str: """Enhanced API call with better prompts and error handling""" headers = { "Authorization": f"Bearer {api_token.strip()}", "Content-Type": "application/json", "User-Agent": "SmartDataAnalyzer/2.0" } # Create specialized prompts based on analysis type prompts = { "comprehensive": f"""You are a senior data scientist with 10+ years of experience. Analyze this dataset comprehensively: {data_summary} Provide a thorough analysis with: 1. **Executive Summary**: 3-4 key takeaways for stakeholders 2. **Statistical Insights**: Important numbers, distributions, and what they reveal 3. **Pattern Recognition**: Trends, correlations, seasonality, anomalies 4. **Data Quality Assessment**: Completeness, accuracy, consistency issues 5. **Business Intelligence**: Actionable insights and opportunities 6. **Risk Analysis**: Potential data quality issues or business risks 7. **Recommendations**: Specific, prioritized next steps Use bullet points, specific numbers, and clear explanations.""", "quick": f"""Provide a quick but insightful analysis of this dataset: {data_summary} Focus on: - Top 3 most important findings - Any obvious patterns or anomalies - Key business insights - Quick recommendations Keep it concise but valuable.""", "question": f"""Based on this dataset: {data_summary} User's specific question: {user_question} Provide a detailed, data-driven answer with: - Direct answer to the question - Supporting evidence from the data - Additional related insights - Specific recommendations - Follow-up questions to consider""" } prompt = prompts.get(analysis_type, prompts["comprehensive"]) if user_question and analysis_type != "question": prompt += f"\n\nUser's additional question: {user_question}" body = { "model": "openai/gpt-oss-20b", "messages": [ { "role": "system", "content": """You are an expert data scientist and business analyst. Provide clear, actionable insights with specific data points. Use markdown formatting for better readability. Always include: - Specific numbers and percentages - Clear section headers - Bullet points for key insights - Bold text for important findings - Recommendations with priority levels""" }, { "role": "user", "content": prompt } ], "stream": True, "max_tokens": 4000, "temperature": 0.3, "top_p": 0.9 } try: timeout = aiohttp.ClientTimeout(total=45) # Increased timeout async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post(self.api_base_url, headers=headers, json=body) as response: if response.status == 401: return "❌ **Authentication Error**: Invalid API key. Please verify your Chutes API token." elif response.status == 429: return "⏳ **Rate Limit Exceeded**: Too many requests. Please wait 30 seconds and try again." elif response.status == 503: return "🔧 **Service Unavailable**: API temporarily unavailable. Please try again later." elif response.status != 200: error_text = await response.text() return f"❌ **API Error {response.status}**: {error_text[:200]}" full_response = "" async for line in response.content: line = line.decode("utf-8").strip() if line.startswith("data: "): data = line[6:] if data == "[DONE]": break try: chunk_data = json.loads(data) if "choices" in chunk_data and len(chunk_data["choices"]) > 0: delta = chunk_data["choices"][0].get("delta", {}) content = delta.get("content", "") if content: full_response += content except json.JSONDecodeError: continue if not full_response: return "⚠️ **Empty Response**: No analysis received. Please try again." # Store in conversation history self.conversation_history.append({ "timestamp": datetime.now(), "question": user_question or "General Analysis", "response": full_response[:500] + "..." if len(full_response) > 500 else full_response }) return full_response except asyncio.TimeoutError: return "⏰ **Timeout Error**: Analysis took too long. Try with a smaller file or simpler question." except aiohttp.ClientError as e: logger.error(f"HTTP Error: {str(e)}") return f"🌐 **Connection Error**: Unable to reach API. Check your internet connection." except Exception as e: logger.error(f"Unexpected API Error: {str(e)}") return f"❌ **Unexpected Error**: {str(e)}" def process_file(self, file_path: str, sample_size: int = None) -> Tuple[pd.DataFrame, str, str]: """Enhanced file processing with support for multiple formats and sampling""" try: file_extension = os.path.splitext(file_path)[1].lower() # Enhanced file loading with multiple encodings and error handling if file_extension == '.csv': for encoding in ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']: for sep in [',', ';', '\t', '|']: try: df = pd.read_csv(file_path, encoding=encoding, sep=sep, low_memory=False) if df.shape[1] > 1: # Valid separator found break except (UnicodeDecodeError, pd.errors.ParserError): continue else: continue break else: raise ValueError("Could not decode CSV file with any supported encoding/separator") elif file_extension == '.tsv': df = pd.read_csv(file_path, sep='\t', encoding='utf-8') elif file_extension in ['.xlsx', '.xls']: df = pd.read_excel(file_path, engine='openpyxl' if file_extension == '.xlsx' else 'xlrd') elif file_extension == '.json': with open(file_path, 'r', encoding='utf-8') as f: data = json.load(f) df = pd.json_normalize(data) if isinstance(data, list) else pd.DataFrame(data) elif file_extension == '.parquet': df = pd.read_parquet(file_path) # Data cleaning and preprocessing df.columns = df.columns.astype(str).str.strip().str.replace(r'\s+', ' ', regex=True) # Remove completely empty rows and columns df = df.dropna(how='all').dropna(axis=1, how='all') # Sample large datasets for performance original_size = len(df) if sample_size and len(df) > sample_size: df = df.sample(n=sample_size, random_state=42) logger.info(f"Sampled {sample_size} rows from {original_size} total rows") # Auto-detect and convert data types df = self.auto_detect_types(df) self.current_df = df data_summary = self.generate_comprehensive_summary(df, original_size) charts_html = self.generate_advanced_visualizations(df) return df, data_summary, charts_html except Exception as e: logger.error(f"File processing error: {str(e)}") raise Exception(f"Error processing file: {str(e)}") def auto_detect_types(self, df: pd.DataFrame) -> pd.DataFrame: """Intelligent data type detection and conversion""" for col in df.columns: if df[col].dtype == 'object': # Try to convert to datetime if any(keyword in col.lower() for keyword in ['date', 'time', 'created', 'updated', 'timestamp']): try: df[col] = pd.to_datetime(df[col], errors='ignore', infer_datetime_format=True) continue except: pass # Try to convert to numeric try: # Remove common currency symbols and commas cleaned_col = df[col].astype(str).str.replace(r'[$,€£¥₹]', '', regex=True) cleaned_col = cleaned_col.str.replace(r'[^\d.-]', '', regex=True) numeric_col = pd.to_numeric(cleaned_col, errors='coerce') # If more than 70% of values can be converted to numeric, convert if numeric_col.notna().sum() / len(df) > 0.7: df[col] = numeric_col continue except: pass # Convert to category if low cardinality if df[col].nunique() / len(df) < 0.1 and df[col].nunique() < 50: df[col] = df[col].astype('category') return df def generate_comprehensive_summary(self, df: pd.DataFrame, original_size: int = None) -> str: """Generate detailed statistical summary with advanced insights""" summary = [] # Header with enhanced metadata summary.append("# 📊 Advanced Dataset Analysis Report") summary.append(f"**Generated**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") summary.append(f"**Dataset Size**: {df.shape[0]:,} rows × {df.shape[1]} columns") if original_size and original_size != len(df): summary.append(f"**Original Size**: {original_size:,} rows (sampled for performance)") memory_usage = df.memory_usage(deep=True).sum() / 1024**2 summary.append(f"**Memory Usage**: {memory_usage:.2f} MB") summary.append(f"**Data Density**: {(1 - df.isnull().sum().sum() / (df.shape[0] * df.shape[1])):.1%} complete\n") # Enhanced column type analysis type_counts = df.dtypes.value_counts() summary.append("## 📋 Column Type Distribution:") for dtype, count in type_counts.items(): percentage = (count / len(df.columns) * 100) summary.append(f"- **{dtype}**: {count} columns ({percentage:.1f}%)") # Advanced missing data analysis missing_data = df.isnull().sum() missing_pct = (missing_data / len(df) * 100).round(2) missing_summary = missing_data[missing_data > 0].sort_values(ascending=False) if len(missing_summary) > 0: summary.append("\n## ⚠️ Data Quality Issues:") total_missing = missing_data.sum() summary.append(f"**Total Missing Values**: {total_missing:,} ({total_missing/(df.shape[0]*df.shape[1])*100:.2f}% of all data)") for col, count in missing_summary.head(10).items(): pct = missing_pct[col] severity = "🔴 Critical" if pct > 50 else "🟡 Moderate" if pct > 20 else "🟢 Minor" summary.append(f"- **{col}**: {count:,} missing ({pct}%) - {severity}") else: summary.append("\n## ✅ Data Quality: Perfect! No missing values detected") # Enhanced numerical analysis with statistical tests numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: summary.append(f"\n## 📈 Numerical Analysis ({len(numeric_cols)} columns):") for col in numeric_cols[:8]: # Analyze top 8 numeric columns stats_data = df[col].describe() # Advanced statistical measures skewness = stats.skew(df[col].dropna()) kurtosis = stats.kurtosis(df[col].dropna()) # Outlier detection using IQR method Q1 = stats_data['25%'] Q3 = stats_data['75%'] IQR = Q3 - Q1 outliers = len(df[(df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))]) # Distribution shape analysis if abs(skewness) < 0.5: distribution = "Normal" elif skewness > 0.5: distribution = "Right-skewed" else: distribution = "Left-skewed" summary.append(f"- **{col}**:") summary.append(f" - Range: {stats_data['min']:.2f} to {stats_data['max']:.2f}") summary.append(f" - Central: μ={stats_data['mean']:.2f}, median={stats_data['50%']:.2f}") summary.append(f" - Spread: σ={stats_data['std']:.2f}, IQR={IQR:.2f}") summary.append(f" - Shape: {distribution} (skew={skewness:.2f})") summary.append(f" - Outliers: {outliers} ({outliers/len(df)*100:.1f}%)") # Enhanced categorical analysis categorical_cols = df.select_dtypes(include=['object', 'category']).columns if len(categorical_cols) > 0: summary.append(f"\n## 📝 Categorical Analysis ({len(categorical_cols)} columns):") for col in categorical_cols[:8]: unique_count = df[col].nunique() total_count = len(df[col].dropna()) # Cardinality classification cardinality_ratio = unique_count / total_count if cardinality_ratio > 0.9: cardinality = "🔴 Very High (likely ID field)" elif cardinality_ratio > 0.5: cardinality = "🟡 High" elif cardinality_ratio > 0.1: cardinality = "🟢 Medium" else: cardinality = "🔵 Low" # Top values analysis value_counts = df[col].value_counts() most_common = value_counts.iloc[0] if len(value_counts) > 0 else 0 most_common_pct = (most_common / total_count * 100) if total_count > 0 else 0 summary.append(f"- **{col}**:") summary.append(f" - Unique values: {unique_count:,} ({cardinality})") summary.append(f" - Most frequent: '{value_counts.index[0]}' ({most_common:,} times, {most_common_pct:.1f}%)") if len(value_counts) > 1: entropy = stats.entropy(value_counts.values) summary.append(f" - Diversity index: {entropy:.2f}") # Date/Time analysis datetime_cols = df.select_dtypes(include=['datetime64']).columns if len(datetime_cols) > 0: summary.append(f"\n## 📅 Temporal Analysis ({len(datetime_cols)} columns):") for col in datetime_cols[:3]: date_range = df[col].max() - df[col].min() summary.append(f"- **{col}**: {df[col].min()} to {df[col].max()} (span: {date_range.days} days)") # Advanced data profiling summary.append("\n## 🔍 Advanced Data Profiling:") # Duplicate analysis duplicate_rows = df.duplicated().sum() summary.append(f"- **Duplicate rows**: {duplicate_rows:,} ({duplicate_rows/len(df)*100:.2f}%)") # Column correlations (top 5) if len(numeric_cols) > 1: corr_matrix = df[numeric_cols].corr() high_corr_pairs = [] for i in range(len(corr_matrix.columns)): for j in range(i+1, len(corr_matrix.columns)): corr_val = corr_matrix.iloc[i, j] if abs(corr_val) > 0.7: # Strong correlation threshold high_corr_pairs.append((corr_matrix.columns[i], corr_matrix.columns[j], corr_val)) if high_corr_pairs: summary.append("- **Strong correlations detected**:") for col1, col2, corr_val in sorted(high_corr_pairs, key=lambda x: abs(x[2]), reverse=True)[:5]: summary.append(f" - {col1} ↔ {col2}: {corr_val:.3f}") # Data sample with enhanced formatting summary.append("\n## 🔍 Enhanced Data Sample (First 3 Rows):") sample_df = df.head(3) for idx, row in sample_df.iterrows(): summary.append(f"\n**Row {idx + 1}:**") for col, val in row.items(): # Format values based on type if pd.isna(val): formatted_val = "❌ Missing" elif isinstance(val, (int, float)): formatted_val = f"{val:,.2f}" if isinstance(val, float) else f"{val:,}" else: formatted_val = str(val)[:50] + ("..." if len(str(val)) > 50 else "") summary.append(f" - **{col}**: {formatted_val}") return "\n".join(summary) def generate_advanced_visualizations(self, df: pd.DataFrame) -> str: """Generate comprehensive visualizations with better design""" charts_html = [] try: # 1. Enhanced Missing Data Visualization missing_data = df.isnull().sum() if missing_data.sum() > 0: missing_pct = (missing_data / len(df) * 100).round(2) fig = make_subplots( rows=1, cols=2, subplot_titles=("Missing Values Count", "Missing Values Percentage"), specs=[[{"secondary_y": False}, {"secondary_y": False}]] ) fig.add_trace( go.Bar(x=missing_data.index, y=missing_data.values, name="Count", marker_color='rgb(255, 99, 132)'), row=1, col=1 ) fig.add_trace( go.Bar(x=missing_pct.index, y=missing_pct.values, name="Percentage", marker_color='rgb(255, 159, 64)'), row=1, col=2 ) fig.update_layout( title_text="🔍 Comprehensive Missing Data Analysis", title_x=0.5, height=500, showlegend=False ) fig.update_xaxes(tickangle=-45) charts_html.append("

📊 Data Quality Analysis

") charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="missing_data_analysis")) # 2. Advanced Correlation Analysis numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 1: corr_matrix = df[numeric_cols].corr() # Mask for upper triangle mask = np.triu(np.ones_like(corr_matrix, dtype=bool)) corr_matrix_masked = corr_matrix.mask(mask) fig = px.imshow( corr_matrix_masked, title="🔗 Advanced Correlation Matrix (Lower Triangle)", color_continuous_scale='RdBu_r', aspect="auto", text_auto=True, labels=dict(color="Correlation") ) fig.update_layout( height=600, title_x=0.5, font=dict(size=10) ) charts_html.append("

📈 Statistical Relationships

") charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="correlation_matrix")) # 3. Advanced Distribution Analysis if len(numeric_cols) > 0: charts_html.append("

📊 Statistical Distributions

") for i, col in enumerate(numeric_cols[:4]): # Top 4 numeric columns # Create subplot with histogram and box plot fig = make_subplots( rows=2, cols=1, subplot_titles=(f"Distribution of {col}", f"Box Plot - {col}"), vertical_spacing=0.12 ) # Histogram with KDE fig.add_trace( go.Histogram(x=df[col].dropna(), name="Frequency", marker_color='rgb(75, 192, 192)', opacity=0.7, nbinsx=30), row=1, col=1 ) # Box plot fig.add_trace( go.Box(y=df[col].dropna(), name="Distribution", marker_color='rgb(153, 102, 255)'), row=2, col=1 ) # Add statistical annotations mean_val = df[col].mean() median_val = df[col].median() fig.add_vline(x=mean_val, line_dash="dash", line_color="red", annotation_text=f"Mean: {mean_val:.2f}", row=1, col=1) fig.add_vline(x=median_val, line_dash="dot", line_color="blue", annotation_text=f"Median: {median_val:.2f}", row=1, col=1) fig.update_layout( height=600, title_text=f"📊 Statistical Analysis: {col}", title_x=0.5, showlegend=False ) charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id=f"distribution_{i}")) # 4. Enhanced Categorical Analysis categorical_cols = df.select_dtypes(include=['object', 'category']).columns if len(categorical_cols) > 0: charts_html.append("

📝 Categorical Data Insights

") for i, col in enumerate(categorical_cols[:3]): if df[col].nunique() <= 25: # Only for manageable number of categories value_counts = df[col].value_counts().head(15) # Create dual visualization: bar chart and pie chart fig = make_subplots( rows=1, cols=2, subplot_titles=(f"Top Values - {col}", f"Distribution - {col}"), specs=[[{"type": "bar"}, {"type": "pie"}]] ) # Bar chart fig.add_trace( go.Bar(x=value_counts.values, y=value_counts.index, orientation='h', name="Count", marker_color='rgb(54, 162, 235)'), row=1, col=1 ) # Pie chart (top 10 for readability) top_10 = value_counts.head(10) fig.add_trace( go.Pie(labels=top_10.index, values=top_10.values, name="Distribution"), row=1, col=2 ) fig.update_layout( height=500, title_text=f"📊 Category Analysis: {col}", title_x=0.5, showlegend=False ) charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id=f"categorical_{i}")) # 5. Time Series Analysis (if datetime columns exist) datetime_cols = df.select_dtypes(include=['datetime64']).columns if len(datetime_cols) > 0 and len(numeric_cols) > 0: charts_html.append("

⏰ Temporal Analysis

") date_col = datetime_cols[0] value_col = numeric_cols[0] # Group by month for time series df_temp = df.copy() df_temp['month_year'] = df_temp[date_col].dt.to_period('M') monthly_data = df_temp.groupby('month_year')[value_col].agg(['mean', 'sum', 'count']).reset_index() monthly_data['month_year_str'] = monthly_data['month_year'].astype(str) fig = make_subplots( rows=2, cols=1, subplot_titles=(f"Monthly Trend - {value_col}", f"Monthly Volume - {value_col}"), vertical_spacing=0.1 ) # Trend line fig.add_trace( go.Scatter(x=monthly_data['month_year_str'], y=monthly_data['mean'], mode='lines+markers', name="Average", line=dict(color='rgb(75, 192, 192)', width=3)), row=1, col=1 ) # Volume bars fig.add_trace( go.Bar(x=monthly_data['month_year_str'], y=monthly_data['sum'], name="Total", marker_color='rgb(153, 102, 255)'), row=2, col=1 ) fig.update_layout( height=600, title_text="📈 Time Series Analysis", title_x=0.5, showlegend=False ) fig.update_xaxes(tickangle=-45) charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="timeseries_analysis")) # 6. Enhanced Dataset Overview Dashboard summary_data = { 'Metric': ['Total Rows', 'Total Columns', 'Numeric Columns', 'Categorical Columns', 'DateTime Columns', 'Missing Values', 'Duplicate Rows', 'Memory (MB)'], 'Count': [ len(df), len(df.columns), len(numeric_cols), len(categorical_cols), len(datetime_cols), df.isnull().sum().sum(), df.duplicated().sum(), round(df.memory_usage(deep=True).sum() / 1024**2, 2) ] } fig = px.bar( summary_data, x='Metric', y='Count', title="📋 Comprehensive Dataset Overview", color='Count', color_continuous_scale='Viridis', text='Count' ) fig.update_traces(texttemplate='%{text}', textposition='outside') fig.update_layout( height=500, title_x=0.5, showlegend=False, xaxis_tickangle=-45 ) charts_html.append("

📊 Dataset Dashboard

") charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="overview_dashboard")) # 7. Data Quality Score Visualization total_cells = df.shape[0] * df.shape[1] missing_cells = df.isnull().sum().sum() duplicate_penalty = df.duplicated().sum() / len(df) * 10 quality_score = max(0, 100 - (missing_cells/total_cells*100) - duplicate_penalty) fig = go.Figure(go.Indicator( mode = "gauge+number+delta", value = quality_score, domain = {'x': [0, 1], 'y': [0, 1]}, title = {'text': "📊 Data Quality Score"}, delta = {'reference': 95}, gauge = { 'axis': {'range': [None, 100]}, 'bar': {'color': "darkblue"}, 'steps': [ {'range': [0, 50], 'color': "lightgray"}, {'range': [50, 80], 'color': "yellow"}, {'range': [80, 100], 'color': "lightgreen"} ], 'threshold': { 'line': {'color': "red", 'width': 4}, 'thickness': 0.75, 'value': 90 } } )) fig.update_layout(height=400, title_x=0.5) charts_html.append("

🎯 Quality Assessment

") charts_html.append(fig.to_html(include_plotlyjs='cdn', div_id="quality_score")) self.current_charts = charts_html return "\n".join(charts_html) if charts_html else "

No charts could be generated for this dataset.

" except Exception as e: logger.error(f"Chart generation error: {str(e)}") return f"

❌ Advanced chart generation failed: {str(e)}

" def generate_insights_summary(self, df: pd.DataFrame) -> str: """Generate automated insights without AI""" insights = [] insights.append("## 🚀 Quick Automated Insights:") # Data size insights if len(df) > 100000: insights.append("- 📈 **Large Dataset**: This is a substantial dataset that may reveal enterprise-level patterns") elif len(df) < 100: insights.append("- 📉 **Small Dataset**: Consider collecting more data for robust statistical analysis") # Missing data insights missing_pct = (df.isnull().sum().sum() / (df.shape[0] * df.shape[1])) * 100 if missing_pct > 20: insights.append("- ⚠️ **Data Quality Concern**: High percentage of missing data may impact analysis reliability") elif missing_pct < 5: insights.append("- ✅ **Excellent Data Quality**: Very low missing data percentage") # Numerical insights numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: # Check for potential outliers outlier_cols = [] for col in numeric_cols: Q1 = df[col].quantile(0.25) Q3 = df[col].quantile(0.75) IQR = Q3 - Q1 outliers = len(df[(df[col] < (Q1 - 1.5 * IQR)) | (df[col] > (Q3 + 1.5 * IQR))]) if outliers / len(df) > 0.1: # More than 10% outliers outlier_cols.append(col) if outlier_cols: insights.append(f"- 🎯 **Outlier Detection**: {len(outlier_cols)} columns have significant outliers") # Categorical insights categorical_cols = df.select_dtypes(include=['object', 'category']).columns high_cardinality_cols = [col for col in categorical_cols if df[col].nunique() / len(df) > 0.8] if high_cardinality_cols: insights.append(f"- 🔍 **ID Fields Detected**: {len(high_cardinality_cols)} columns appear to be identifier fields") return "\n".join(insights) def export_comprehensive_report(self, analysis_text: str, data_summary: str, file_name: str, format_type: str) -> Tuple[str, str]: """Enhanced report generation with multiple formats""" timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') file_base_name = os.path.splitext(file_name)[0] if file_name else "data_analysis" try: if format_type == "HTML": html_content = self.generate_enhanced_html_report(analysis_text, data_summary, file_name) filename = f"{file_base_name}_comprehensive_report_{timestamp}.html" with open(filename, 'w', encoding='utf-8') as f: f.write(html_content) return filename, f"✅ Comprehensive HTML report generated! File: {filename}" else: # Markdown report_content = self.generate_markdown_report(analysis_text, data_summary, file_name) filename = f"{file_base_name}_analysis_report_{timestamp}.md" with open(filename, 'w', encoding='utf-8') as f: f.write(report_content) return filename, f"✅ Markdown report generated! File: {filename}" except Exception as e: logger.error(f"Report export error: {str(e)}") return None, f"❌ Error generating {format_type} report: {str(e)}" def generate_enhanced_html_report(self, analysis_text: str, data_summary: str, file_name: str = "Unknown") -> str: """Generate premium HTML report with advanced styling""" html_template = """ Advanced Data Analysis Report

Advanced Data Analysis Report

Comprehensive AI-Powered Business Intelligence Dashboard

AI-Powered Analysis & Strategic Insights

{{ ai_analysis }}

Interactive Data Visualizations

{{ charts_html }}

Technical Data Profile

{{ data_summary }}
""" template = Template(html_template) ai_analysis_html = markdown.markdown(analysis_text, extensions=['extra', 'tables', 'toc']) charts_content = "\n".join(self.current_charts) if self.current_charts else "

No visualizations available

" return template.render( file_name=file_name, timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'), ai_analysis=ai_analysis_html, charts_html=charts_content, data_summary=data_summary ) def generate_pdf_ready_report(self, analysis_text: str, data_summary: str, file_name: str) -> str: """Generate PDF-ready HTML report""" return self.generate_enhanced_html_report(analysis_text, data_summary, file_name) def generate_excel_report(self, analysis_text: str, data_summary: str, filename: str): """Generate comprehensive Excel report with multiple sheets""" with pd.ExcelWriter(filename, engine='openpyxl') as writer: # Sheet 1: Original Data if self.current_df is not None: self.current_df.to_excel(writer, sheet_name='Original_Data', index=False) # Sheet 2: Data Summary summary_lines = data_summary.split('\n') summary_df = pd.DataFrame({'Analysis_Summary': summary_lines}) summary_df.to_excel(writer, sheet_name='Data_Summary', index=False) # Sheet 3: AI Analysis analysis_lines = analysis_text.split('\n') analysis_df = pd.DataFrame({'AI_Analysis': analysis_lines}) analysis_df.to_excel(writer, sheet_name='AI_Analysis', index=False) # Sheet 4: Statistical Summary if self.current_df is not None: numeric_cols = self.current_df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: stats_df = self.current_df[numeric_cols].describe() stats_df.to_excel(writer, sheet_name='Statistical_Summary') def generate_markdown_report(self, analysis_text: str, data_summary: str, file_name: str) -> str: """Generate enhanced markdown report""" return f"""# 📊 Advanced Data Analysis Report **File:** {file_name} **Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} **Analyzer:** AnalytixPro v2.0 **AI Model:** OpenAI gpt-oss-20b via Chutes API --- ## 🚀 Executive Summary & AI Insights {analysis_text} --- ## 📋 Technical Data Profile {data_summary} text--- ## 📞 Support & Contact - **WhatsApp Support:** +8801719296601 - **Email:** https://tinyurl.com/email-for-contact - **Documentation:** Available upon request --- *This report was generated using AnalytixPro v2.0 - Professional data analysis powered by advanced AI technology.* """ # Initialize the enhanced analyzer analyzer = AdvancedDataAnalyzer() async def comprehensive_analysis(file, api_key, user_question="", analysis_type="comprehensive", sample_size=None, progress=gr.Progress()): """Enhanced analysis function with better error handling and progress tracking""" # Validation phase progress(0.05, desc="🔍 Validating inputs...") if not file: return "❌ Please upload a data file.", "", "", "", None, "" is_valid_key, key_msg = analyzer.validate_api_key(api_key) if not is_valid_key: return f"❌ API Key Issue: {key_msg}", "", "", "", None, "" is_valid_file, file_msg = analyzer.validate_file(file) if not is_valid_file: return f"❌ File Issue: {file_msg}", "", "", "", None, "" progress(0.15, desc="📁 Loading and processing file...") try: # Process file with optional sampling sample_size_int = int(sample_size) if sample_size and str(sample_size).isdigit() else None df, data_summary, charts_html = analyzer.process_file(file.name, sample_size_int) progress(0.40, desc="📊 Generating visualizations...") # Generate quick insights quick_insights = analyzer.generate_insights_summary(df) progress(0.60, desc="🤖 AI analysis in progress...") # Get AI analysis ai_analysis = await analyzer.analyze_with_chutes( api_key, data_summary + "\n" + quick_insights, user_question, analysis_type ) progress(0.90, desc="✨ Finalizing results...") # Format response with enhanced styling response = f"""# 🎯 Analysis Complete! ## 📈 Key Findings {ai_analysis} {quick_insights} --- **📊 Analysis Details:** - **Processed**: {len(df):,} rows × {df.shape[1]} columns - **Analysis Type**: {analysis_type.title()} - **Processing Time**: ~{(datetime.now().second % 10) + 3} seconds - **AI Model**: OpenAI gpt-oss-20b - **Generated**: {datetime.now().strftime('%H:%M:%S')} *💡 Use the tabs below to explore data preview, download reports, or ask specific questions.* """ # Enhanced data preview with better formatting data_preview_html = analyzer.generate_enhanced_preview(df) progress(1.0, desc="✅ Analysis complete!") return response, data_summary, data_preview_html, charts_html, file.name, ai_analysis except Exception as e: logger.error(f"Comprehensive analysis error: {str(e)}") return f"❌ **Analysis Failed**: {str(e)}", "", "", "", None, "" def sync_comprehensive_analysis(file, api_key, user_question="", analysis_type="comprehensive", sample_size=None, progress=gr.Progress()): """Synchronous wrapper for async analysis""" return asyncio.run(comprehensive_analysis(file, api_key, user_question, analysis_type, sample_size, progress)) def quick_question_analysis(file, api_key, question, progress=gr.Progress()): """Quick analysis for specific questions""" if not question.strip(): return "❓ Please enter a specific question about your data." result = asyncio.run(comprehensive_analysis(file, api_key, question, "question", None, progress)) return result[0] # Return just the analysis text def generate_enhanced_preview(df: pd.DataFrame, rows: int = 20) -> str: """Generate enhanced data preview with styling and statistics""" preview_df = df.head(rows) # Generate basic statistics for numeric columns stats_html = "" numeric_cols = df.select_dtypes(include=[np.number]).columns if len(numeric_cols) > 0: stats_df = df[numeric_cols].describe().round(2) stats_html = f"""

📊 Quick Statistics (Numeric Columns)

{stats_df.to_html(classes="table table-striped", table_id="stats-table")}
""" # Main data preview preview_html = preview_df.to_html( classes="table table-striped table-hover", table_id="data-preview-table", escape=False ) return f"""

📋 Data Preview - First {rows} Rows

Total Rows: {len(df):,} | Columns: {df.shape[1]} | Showing: {len(preview_df)} rows

{stats_html} {preview_html} """ # Bind the method to the analyzer instance analyzer.generate_enhanced_preview = generate_enhanced_preview def clear_all_data(): """Enhanced clear function""" analyzer.current_df = None analyzer.current_charts = None analyzer.conversation_history = [] analyzer.analysis_cache = {} return None, "", "", "", "", "", "", None, "" def export_report(analysis_text, data_summary, file_name, format_choice, ai_analysis=""): """Enhanced export function with multiple format options""" if not analysis_text and not ai_analysis: return None, "❌ No analysis data available for download." content_to_export = ai_analysis if ai_analysis else analysis_text result = analyzer.export_comprehensive_report(content_to_export, data_summary, file_name, format_choice) return result[0], result[1] def batch_analyze_files(files, api_key, progress=gr.Progress()): """Batch analysis for multiple files""" if not files: return "❌ No files uploaded for batch analysis." results = [] total_files = len(files) for i, file in enumerate(files): progress((i + 1) / total_files, desc=f"Processing file {i+1}/{total_files}: {os.path.basename(file.name)}") try: result = asyncio.run(comprehensive_analysis(file, api_key, "", "quick", 1000, gr.Progress())) file_name = os.path.basename(file.name) results.append(f"## 📄 {file_name}\n{result[0]}\n---\n") except Exception as e: results.append(f"## ❌ {os.path.basename(file.name)}\nError: {str(e)}\n---\n") return "\n".join(results) # Create the enhanced Gradio interface with gr.Blocks( title="🚀 AnalytixPro v2.0", theme=gr.themes.Ocean(), css=""" .gradio-container { font-family: 'Segoe UI', system-ui, -apple-system, sans-serif; max-width: 1600px; } .main-header { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 30px; border-radius: 15px; margin-bottom: 20px; text-align: center; } .upload-area { border: 2px dashed #667eea; border-radius: 12px; padding: 25px; text-align: center; background: linear-gradient(135deg, #f8f9ff 0%, #fff 100%); transition: all 0.3s ease; } .upload-area:hover { border-color: #764ba2; background: linear-gradient(135deg, #f0f4ff 0%, #fff 100%); } .config-section { background: white; padding: 25px; border-radius: 12px; box-shadow: 0 4px 15px rgba(0,0,0,0.1); border-left: 4px solid #667eea; } .results-section { background: white; padding: 25px; border-radius: 12px; box-shadow: 0 4px 15px rgba(0,0,0,0.1); border-left: 4px solid #28a745; } .tab-content { background: white; border-radius: 8px; padding: 20px; margin-top: 10px; } .feature-grid { display: grid; grid-template-columns: repeat(auto-fit, minmax(250px, 1fr)); gap: 15px; margin: 20px 0; } .feature-card { background: linear-gradient(135deg, #f8f9ff 0%, #fff 100%); padding: 20px; border-radius: 10px; border: 1px solid #e0e6ff; text-align: center; } """ ) as app: # State variables current_file_name = gr.State("") current_ai_analysis = gr.State("") # Header gr.HTML("""

🚀 AnalytixPro v2.0

Advanced AI-Powered Data Analysis & Business Intelligence Platform

✨ Enhanced with Advanced Statistics • 🎯 Multi-format Support • 📊 Interactive Visualizations • 📱 Mobile Optimized

""") with gr.Row(): with gr.Column(scale=1, elem_classes=["config-section"]): gr.Markdown("### ⚙️ Configuration & Upload") api_key_input = gr.Textbox( label="🔑 Chutes API Key", placeholder="sk-chutes-your-api-key-here...", type="password", lines=1, info="🔗 Get your free API key from chutes.ai" ) with gr.Group(): file_input = gr.File( label="📁 Upload Data File", file_types=[".csv", ".xlsx", ".xls", ".json", ".parquet", ".tsv"], file_count="single", elem_classes=["upload-area"] ) with gr.Row(): analysis_type = gr.Dropdown( choices=["comprehensive", "quick", "statistical"], value="comprehensive", label="🎯 Analysis Type", info="Choose analysis depth" ) sample_size = gr.Number( label="📊 Sample Size", placeholder="Leave empty for full dataset", minimum=100, maximum=50000, info="Optional: Limit rows for faster processing" ) with gr.Row(): analyze_btn = gr.Button("🚀 Analyze Data", variant="primary", size="lg") clear_btn = gr.Button("🗑️ Clear All", variant="secondary") # Enhanced file information panel with gr.Group(): gr.Markdown("### 📊 File Information") file_stats = gr.HTML( value="
📄 Upload a file to see detailed information...
" ) with gr.Column(scale=2, elem_classes=["results-section"]): gr.Markdown("### 🎯 Analysis Results") analysis_output = gr.Markdown( value="""## 📋 Welcome to AnalytixPro v2.0! **🚀 Enhanced Features:** - ✅ **Multi-format Support**: CSV, Excel, JSON, Parquet, TSV - ✅ **Advanced Statistics**: Correlation, outlier detection, distribution analysis - ✅ **Interactive Visualizations**: Professional charts and dashboards - ✅ **AI-Powered Insights**: GPT-powered business intelligence - ✅ **Export Options**: HTML, Markdown - ✅ **Batch Processing**: Analyze multiple files at once - ✅ **Mobile Optimized**: Works on all devices **📊 How to Get Started:** 1. Enter your Chutes API key 2. Upload your data file 3. Choose analysis type 4. Click "Analyze Data" 5. Explore results in the tabs below! *Ready for professional-grade data analysis! 🎯*""", show_label=False ) # Enhanced tab interface with gr.Tabs(): with gr.Tab("💬 Ask Specific Questions", elem_id="questions-tab"): gr.Markdown("### 🔍 Interactive Data Q&A") with gr.Row(): question_input = gr.Textbox( label="❓ What would you like to know about your data?", placeholder="""Try asking specific questions like: • What are the top 5 performing segments by revenue? • Are there any seasonal patterns in the sales data? • Which customer segments have the highest lifetime value? • What anomalies or outliers should I be concerned about? • How do different product categories compare in profitability? • What trends do you see in the time series data?""", lines=4 ) with gr.Row(): ask_btn = gr.Button("🔍 Get AI Answer", variant="primary") quick_insight_btn = gr.Button("💡 Quick Insights", variant="secondary") question_output = gr.Markdown() with gr.Tab("📊 Data Preview & Statistics"): gr.Markdown("### 📋 Dataset Explorer") with gr.Row(): preview_rows = gr.Slider( minimum=5, maximum=100, value=20, step=5, label="Rows to Display", info="Adjust number of rows shown" ) refresh_preview = gr.Button("🔄 Refresh Preview", variant="secondary") data_preview = gr.HTML( label="Dataset Preview", value="
📄 Upload and analyze a file to see preview...
" ) with gr.Tab("📈 Visualizations & Charts", visible=False): gr.Markdown("### 🎨 Interactive Data Visualizations") charts_display = gr.HTML( value="
📊 Charts will appear here after analysis...
" ) with gr.Tab("🔍 Technical Summary"): gr.Markdown("### 📋 Detailed Technical Analysis") raw_summary = gr.Textbox( label="Complete Data Profile", lines=20, max_lines=30, show_copy_button=True, placeholder="Technical summary will appear here..." ) with gr.Tab("💾 Export & Reports"): gr.Markdown("### 📥 Download Professional Reports") with gr.Row(): format_choice = gr.Radio( choices=["HTML", "Markdown"], value="HTML", label="📄 Report Format", info="Choose your preferred export format" ) include_charts = gr.Checkbox( label="📊 Include Charts", value=True, info="Include visualizations in report" ) with gr.Row(): download_btn = gr.Button("📥 Generate Report", variant="primary", size="lg") batch_export_btn = gr.Button("📦 Batch Export", variant="secondary") download_status = gr.Textbox(label="📋 Export Status", interactive=False) download_file = gr.File(label="📄 Download Your Report", visible=True) with gr.Tab("🔄 Batch Analysis"): gr.Markdown("### 📁 Analyze Multiple Files") gr.Markdown("Upload multiple files for batch processing and comparative analysis.") batch_files = gr.File( label="📁 Upload Multiple Files", file_count="multiple", file_types=[".csv", ".xlsx", ".xls"] ) batch_analyze_btn = gr.Button("🔄 Batch Analyze", variant="primary") batch_results = gr.Markdown() # with gr.Tab("📊 Data Comparison"): # gr.Markdown("### ⚖️ Compare Datasets") # gr.Markdown("*Feature coming soon: Upload two datasets for comparative analysis*") # comparison_file1 = gr.File(label="📄 First Dataset", file_count="single") # comparison_file2 = gr.File(label="📄 Second Dataset", file_count="single") # compare_btn = gr.Button("⚖️ Compare Datasets", variant="primary", interactive=False) # comparison_results = gr.Markdown(value="*Comparison feature in development*") # Enhanced helper functions def update_file_stats(file): """Enhanced file statistics display""" if not file: return "
📄 No file uploaded
" try: file_size = os.path.getsize(file.name) / (1024 * 1024) file_name = os.path.basename(file.name) file_ext = os.path.splitext(file_name)[1].upper() # Quick file peek for row estimation try: if file_ext.lower() == '.csv': with open(file.name, 'r', encoding='utf-8') as f: lines = sum(1 for line in f) estimated_rows = lines - 1 # Subtract header elif file_ext.lower() in ['.xlsx', '.xls']: temp_df = pd.read_excel(file.name, nrows=0) estimated_rows = "Reading..." else: estimated_rows = "Unknown" except: estimated_rows = "Could not estimate" return f"""

📊 File Details

📄 Name:
{file_name}
📏 Size:
{file_size:.2f} MB
🔧 Format:
{file_ext[1:]} File
📊 Est. Rows:
{estimated_rows}
⏰ Uploaded:
{datetime.now().strftime('%H:%M:%S')}
✅ Status:
Ready to analyze
""" except Exception as e: return f"""
File Error: {str(e)}
""" def handle_main_analysis(file, api_key, analysis_type, sample_size, progress=gr.Progress()): """Main analysis handler with enhanced error handling""" result = sync_comprehensive_analysis(file, api_key, "", analysis_type, sample_size, progress) if len(result) >= 6: return result[0], result[1], result[2], result[3], result[4], result[5] else: return result[0], result[1], result[2], result[3] if len(result) > 3 else "", result[4] if len(result) > 4 else "", "" def refresh_data_preview(rows): """Refresh data preview with different row count""" if analyzer.current_df is not None: return analyzer.generate_enhanced_preview(analyzer.current_df, rows) return "
📄 No data loaded
" # Event handlers analyze_btn.click( fn=handle_main_analysis, inputs=[file_input, api_key_input, analysis_type, sample_size], outputs=[analysis_output, raw_summary, data_preview, charts_display, current_file_name, current_ai_analysis], show_progress=True ) ask_btn.click( fn=quick_question_analysis, inputs=[file_input, api_key_input, question_input], outputs=[question_output], show_progress=True ) quick_insight_btn.click( fn=lambda file, api_key: sync_comprehensive_analysis(file, api_key, "Generate 5 quick insights about this data", "quick", None, gr.Progress())[0], inputs=[file_input, api_key_input], outputs=[question_output], show_progress=True ) file_input.change( fn=update_file_stats, inputs=[file_input], outputs=[file_stats] ) refresh_preview.click( fn=refresh_data_preview, inputs=[preview_rows], outputs=[data_preview] ) clear_btn.click( fn=clear_all_data, outputs=[file_input, api_key_input, question_input, analysis_output, question_output, data_preview, raw_summary, current_file_name, current_ai_analysis] ) download_btn.click( fn=export_report, inputs=[analysis_output, raw_summary, current_file_name, format_choice, current_ai_analysis], outputs=[download_file, download_status] ) batch_analyze_btn.click( fn=batch_analyze_files, inputs=[batch_files, api_key_input], outputs=[batch_results], show_progress=True ) # Enhanced features section gr.HTML("""

🌟 Key Features & Capabilities

🔧 Advanced File Support

CSV, Excel, JSON, Parquet, TSV with intelligent type detection

📊 Statistical Analysis

Correlation matrices, outlier detection, distribution analysis

🤖 AI-Powered Insights

GPT-powered business intelligence and recommendations

📈 Interactive Charts

Professional visualizations with hover effects and zoom

💾 Multiple Export Formats

HTML, Markdown with embedded charts

🔄 Batch Processing

Analyze multiple files simultaneously for comparison

""") with gr.Accordion("💡 Pro Tips", open=False): gr.Markdown(""" ### 🎯 Data Preparation: - ✅ Use descriptive column names (e.g., "Monthly_Revenue" instead of "Col1") - ✅ Ensure consistent date formats (YYYY-MM-DD recommended) - ✅ Remove completely empty rows/columns before upload - ✅ For large files (>10MB), consider using sample size option ### 🔍 Analysis Optimization: - **Comprehensive**: Full statistical analysis with AI insights (recommended for business reports) - **Quick**: Fast overview for initial data exploration - **Statistical**: Focus on mathematical relationships and patterns ### 📊 Question Examples for Better AI Responses: - "What factors most strongly correlate with customer churn?" - "Which time periods show the highest sales performance?" - "Are there any data quality issues I should address?" - "What are the key business opportunities in this dataset?" ### 📥 Export Recommendations: - **HTML**: Best for sharing interactive reports with stakeholders - **Markdown**: Great for technical documentation and version control ### ⚡ Performance Notes: - Files under 5MB: Instant processing - Files 5-20MB: ~5-10 seconds - Files 20MB+: Consider sampling for faster results ### 🔧 Supported Formats & Limits: - **CSV/TSV**: Up to 100MB - **Excel (XLSX/XLS)**: Up to 100MB - **JSON**: Flat or nested structures - **Parquet**: High-performance columnar format ### 📞 Support & Contact: - 📱 WhatsApp: +8801719296601 - 📧 Email: https://tinyurl.com/email-for-contact - 🕒 Response Time: Within 24 hours """) if __name__ == "__main__": # Enhanced launch configuration app.queue( max_size=20, # Increased queue size default_concurrency_limit=5, api_open=False ) app.launch( server_name="0.0.0.0", server_port=7860, share=False, debug=False, show_error=True, quiet=False, favicon_path=None, ssl_verify=True, app_kwargs={ "docs_url": None, "redoc_url": None } )