import os import tempfile import base64 from io import BytesIO from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass from pathlib import Path import cv2 import numpy as np import torch import onnxruntime as rt from PIL import Image import gradio as gr import pandas as pd from transformers import pipeline from huggingface_hub import hf_hub_download # Import necessary function from aesthetic_predictor_v2_5 from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip @dataclass class EvaluationResult: """Data class for storing image evaluation results.""" file_name: str image: Image.Image aesthetic_shadow: Optional[float] = None waifu_scorer: Optional[float] = None aesthetic_v2_5: Optional[float] = None anime_aesthetic: Optional[float] = None final_score: Optional[float] = None class MLP(torch.nn.Module): """Optimized MLP for image feature regression.""" def __init__(self, input_size: int = 768): super().__init__() self.network = torch.nn.Sequential( torch.nn.Linear(input_size, 1024), torch.nn.ReLU(), torch.nn.BatchNorm1d(1024), torch.nn.Dropout(0.2), torch.nn.Linear(1024, 256), torch.nn.ReLU(), torch.nn.BatchNorm1d(256), torch.nn.Dropout(0.1), torch.nn.Linear(256, 64), torch.nn.ReLU(), torch.nn.Linear(64, 1) ) def forward(self, x: torch.Tensor) -> torch.Tensor: return self.network(x) class ModelLoader: """Centralized model loading and management.""" def __init__(self, device: str = None): self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu') self.models = {} self._load_all_models() def _load_all_models(self): """Load all models during initialization.""" try: self._load_aesthetic_shadow() self._load_waifu_scorer() self._load_aesthetic_v2_5() self._load_anime_aesthetic() print("✅ All models loaded successfully!") except Exception as e: print(f"❌ Error loading models: {e}") def _load_aesthetic_shadow(self): """Load Aesthetic Shadow model.""" print("🔄 Loading Aesthetic Shadow...") self.models['aesthetic_shadow'] = pipeline( "image-classification", model="NeoChen1024/aesthetic-shadow-v2-backup", device=self.device ) def _load_waifu_scorer(self): """Load Waifu Scorer model.""" print("🔄 Loading Waifu Scorer...") try: import clip # Load MLP model_path = hf_hub_download("Eugeoter/waifu-scorer-v3", "model.pth") mlp = MLP() state_dict = torch.load(model_path, map_location=self.device) mlp.load_state_dict(state_dict) mlp.to(self.device).eval() # Load CLIP clip_model, preprocess = clip.load("ViT-L/14", device=self.device) self.models['waifu_scorer'] = { 'mlp': mlp, 'clip_model': clip_model, 'preprocess': preprocess } except Exception as e: print(f"⚠️ Waifu Scorer not available: {e}") self.models['waifu_scorer'] = None def _load_aesthetic_v2_5(self): """Load Aesthetic Predictor V2.5.""" print("🔄 Loading Aesthetic V2.5...") try: model, preprocessor = convert_v2_5_from_siglip( low_cpu_mem_usage=True, trust_remote_code=True, ) if torch.cuda.is_available(): model = model.to(torch.bfloat16).cuda() self.models['aesthetic_v2_5'] = { 'model': model, 'preprocessor': preprocessor } except Exception as e: print(f"⚠️ Aesthetic V2.5 not available: {e}") self.models['aesthetic_v2_5'] = None def _load_anime_aesthetic(self): """Load Anime Aesthetic model.""" print("🔄 Loading Anime Aesthetic...") try: model_path = hf_hub_download("skytnt/anime-aesthetic", "model.onnx") self.models['anime_aesthetic'] = rt.InferenceSession( model_path, providers=['CPUExecutionProvider'] ) except Exception as e: print(f"⚠️ Anime Aesthetic not available: {e}") self.models['anime_aesthetic'] = None class ImageEvaluator: """Main image evaluation class with batch processing.""" def __init__(self): self.loader = ModelLoader() self.temp_dir = Path(tempfile.mkdtemp()) def evaluate_images( self, images: List[Image.Image], file_names: List[str], selected_models: List[str], batch_size: int = 4, progress_callback=None ) -> List[EvaluationResult]: """Evaluate images using selected models.""" results = [] total_batches = (len(images) + batch_size - 1) // batch_size for batch_idx in range(0, len(images), batch_size): batch_images = images[batch_idx:batch_idx + batch_size] batch_names = file_names[batch_idx:batch_idx + batch_size] # Update progress if progress_callback: progress = (batch_idx // batch_size + 1) / total_batches progress_callback(progress, f"Processing batch {batch_idx//batch_size + 1}/{total_batches}") # Process batch batch_results = self._process_batch(batch_images, batch_names, selected_models) results.extend(batch_results) return results def _process_batch( self, images: List[Image.Image], file_names: List[str], selected_models: List[str] ) -> List[EvaluationResult]: """Process a single batch of images.""" batch_results = [] # Initialize results for i, (img, name) in enumerate(zip(images, file_names)): result = EvaluationResult(file_name=name, image=img) batch_results.append(result) # Process each selected model if 'aesthetic_shadow' in selected_models: scores = self._eval_aesthetic_shadow(images) for result, score in zip(batch_results, scores): result.aesthetic_shadow = score if 'waifu_scorer' in selected_models: scores = self._eval_waifu_scorer(images) for result, score in zip(batch_results, scores): result.waifu_scorer = score if 'aesthetic_v2_5' in selected_models: scores = self._eval_aesthetic_v2_5(images) for result, score in zip(batch_results, scores): result.aesthetic_v2_5 = score if 'anime_aesthetic' in selected_models: scores = self._eval_anime_aesthetic(images) for result, score in zip(batch_results, scores): result.anime_aesthetic = score # Calculate final scores for result in batch_results: result.final_score = self._calculate_final_score(result, selected_models) return batch_results def _eval_aesthetic_shadow(self, images: List[Image.Image]) -> List[Optional[float]]: """Evaluate using Aesthetic Shadow model.""" if not self.loader.models.get('aesthetic_shadow'): return [None] * len(images) try: results = self.loader.models['aesthetic_shadow'](images) scores = [] for result in results: try: hq_score = next(p for p in result if p['label'] == 'hq')['score'] scores.append(float(np.clip(hq_score * 10.0, 0.0, 10.0))) except: scores.append(None) return scores except Exception as e: print(f"Error in Aesthetic Shadow: {e}") return [None] * len(images) def _eval_waifu_scorer(self, images: List[Image.Image]) -> List[Optional[float]]: """Evaluate using Waifu Scorer model.""" model_dict = self.loader.models.get('waifu_scorer') if not model_dict: return [None] * len(images) try: with torch.no_grad(): # Preprocess images image_tensors = [model_dict['preprocess'](img).unsqueeze(0) for img in images] if len(image_tensors) == 1: image_tensors = image_tensors * 2 # CLIP requirement image_batch = torch.cat(image_tensors).to(self.loader.device) image_features = model_dict['clip_model'].encode_image(image_batch) # Normalize features norm = image_features.norm(2, dim=-1, keepdim=True) norm[norm == 0] = 1 im_emb = (image_features / norm).to(self.loader.device) predictions = model_dict['mlp'](im_emb) scores = predictions.clamp(0, 10).cpu().numpy().flatten().tolist() return scores[:len(images)] except Exception as e: print(f"Error in Waifu Scorer: {e}") return [None] * len(images) def _eval_aesthetic_v2_5(self, images: List[Image.Image]) -> List[Optional[float]]: """Evaluate using Aesthetic Predictor V2.5.""" model_dict = self.loader.models.get('aesthetic_v2_5') if not model_dict: return [None] * len(images) try: rgb_images = [img.convert("RGB") for img in images] pixel_values = model_dict['preprocessor'](images=rgb_images, return_tensors="pt").pixel_values if torch.cuda.is_available(): pixel_values = pixel_values.to(torch.bfloat16).cuda() with torch.inference_mode(): scores = model_dict['model'](pixel_values).logits.squeeze().float().cpu().numpy() if scores.ndim == 0: scores = np.array([scores]) return [float(np.clip(s, 0.0, 10.0)) for s in scores.tolist()] except Exception as e: print(f"Error in Aesthetic V2.5: {e}") return [None] * len(images) def _eval_anime_aesthetic(self, images: List[Image.Image]) -> List[Optional[float]]: """Evaluate using Anime Aesthetic model.""" model = self.loader.models.get('anime_aesthetic') if not model: return [None] * len(images) scores = [] for img in images: try: # Preprocess image img_np = np.array(img).astype(np.float32) / 255.0 h, w = img_np.shape[:2] s = 768 if h > w: new_h, new_w = s, int(s * w / h) else: new_h, new_w = int(s * h / w), s resized = cv2.resize(img_np, (new_w, new_h)) canvas = np.zeros((s, s, 3), dtype=np.float32) pad_h = (s - new_h) // 2 pad_w = (s - new_w) // 2 canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :] pred = model.run(None, {"img": input_tensor})[0].item() scores.append(float(np.clip(pred * 10.0, 0.0, 10.0))) except Exception as e: print(f"Error processing image: {e}") scores.append(None) return scores def _calculate_final_score(self, result: EvaluationResult, selected_models: List[str]) -> Optional[float]: """Calculate final score from selected model results.""" scores = [] for model in selected_models: score = getattr(result, model, None) if score is not None: scores.append(score) return float(np.mean(scores)) if scores else None def results_to_dataframe(self, results: List[EvaluationResult]) -> pd.DataFrame: """Convert results to pandas DataFrame.""" data = [] for result in results: row = { 'File Name': result.file_name, 'Final Score': result.final_score, } if result.aesthetic_shadow is not None: row['Aesthetic Shadow'] = result.aesthetic_shadow if result.waifu_scorer is not None: row['Waifu Scorer'] = result.waifu_scorer if result.aesthetic_v2_5 is not None: row['Aesthetic V2.5'] = result.aesthetic_v2_5 if result.anime_aesthetic is not None: row['Anime Aesthetic'] = result.anime_aesthetic data.append(row) return pd.DataFrame(data) def optimize_batch_size(self, sample_images: List[Image.Image]) -> int: """Automatically determine optimal batch size.""" if not sample_images: return 1 test_image = sample_images[0] batch_size = 1 max_test = min(16, len(sample_images)) while batch_size <= max_test: try: test_batch = [test_image] * batch_size # Test with a lightweight model if self.loader.models.get('aesthetic_shadow'): _ = self.loader.models['aesthetic_shadow'](test_batch) batch_size *= 2 except Exception: break optimal = max(1, batch_size // 2) return min(optimal, 8) # Cap at reasonable size def create_interface(): """Create the Gradio interface.""" evaluator = ImageEvaluator() # Available models model_choices = [ ("Aesthetic Shadow", "aesthetic_shadow"), ("Waifu Scorer", "waifu_scorer"), ("Aesthetic V2.5", "aesthetic_v2_5"), ("Anime Aesthetic", "anime_aesthetic") ] available_models = [choice[1] for choice in model_choices] with gr.Blocks(title="Image Evaluation Tool", theme=gr.themes.Soft()) as app: gr.Markdown(""" # 🎨 Modern Image Evaluation Tool Upload images to evaluate them using state-of-the-art aesthetic and quality prediction models. **Features:** - Multiple AI models for comprehensive evaluation - Batch processing with automatic optimization - Interactive results table with sorting and filtering - CSV export functionality - Real-time progress tracking """) with gr.Row(): with gr.Column(scale=1): # Input components input_files = gr.File( label="📁 Upload Images", file_count="multiple", file_types=["image"] ) model_selection = gr.CheckboxGroup( choices=model_choices, value=available_models, label="🤖 Select Models", info="Choose which models to use for evaluation" ) with gr.Row(): auto_batch = gr.Checkbox( label="🔄 Auto Batch Size", value=True, info="Automatically optimize batch size" ) manual_batch = gr.Slider( minimum=1, maximum=16, value=4, step=1, label="📊 Batch Size", interactive=False, info="Manual batch size (when auto is disabled)" ) evaluate_btn = gr.Button( "🚀 Evaluate Images", variant="primary", size="lg" ) clear_btn = gr.Button("🗑️ Clear Results", variant="secondary") with gr.Column(scale=2): # Progress and status progress_bar = gr.Progress() status_text = gr.Textbox( label="📊 Status", interactive=False, max_lines=2 ) # Results display results_table = gr.DataFrame( label="📋 Evaluation Results", interactive=False, wrap=True, max_height=400 ) # Export functionality with gr.Row(): export_csv = gr.Button("📥 Export CSV", variant="secondary") download_file = gr.File( label="💾 Download", visible=False ) # State management results_state = gr.State([]) # Event handlers def toggle_batch_slider(auto_enabled): return gr.update(interactive=not auto_enabled) def process_images(files, models, auto_batch_enabled, manual_batch_size, progress=gr.Progress()): if not files or not models: return "❌ Please upload images and select at least one model", pd.DataFrame(), [] try: # Load images images = [] file_names = [] progress(0.1, "📂 Loading images...") for file in files: try: img = Image.open(file.name).convert("RGB") images.append(img) file_names.append(os.path.basename(file.name)) except Exception as e: print(f"Error loading {file.name}: {e}") if not images: return "❌ No valid images loaded", pd.DataFrame(), [] # Determine batch size if auto_batch_enabled: batch_size = evaluator.optimize_batch_size(images[:2]) progress(0.2, f"🔧 Optimized batch size: {batch_size}") else: batch_size = int(manual_batch_size) # Process images def progress_callback(prog, msg): progress(0.2 + prog * 0.7, msg) results = evaluator.evaluate_images( images, file_names, models, batch_size, progress_callback ) progress(0.95, "📊 Generating results table...") # Convert to DataFrame df = evaluator.results_to_dataframe(results) df = df.sort_values('Final Score', ascending=False, na_position='last') progress(1.0, f"✅ Processed {len(results)} images successfully!") return f"✅ Evaluated {len(results)} images using {len(models)} models", df, results except Exception as e: return f"❌ Error during processing: {str(e)}", pd.DataFrame(), [] def update_results_table(models, current_results): if not current_results: return pd.DataFrame() # Recalculate final scores based on selected models for result in current_results: result.final_score = evaluator._calculate_final_score(result, models) df = evaluator.results_to_dataframe(current_results) return df.sort_values('Final Score', ascending=False, na_position='last') def export_results(current_results): if not current_results: return gr.update(visible=False) df = evaluator.results_to_dataframe(current_results) csv_path = evaluator.temp_dir / "evaluation_results.csv" df.to_csv(csv_path, index=False) return gr.update(value=str(csv_path), visible=True) def clear_all(): return ( "🔄 Ready for new evaluation", pd.DataFrame(), [], gr.update(visible=False) ) # Wire up events auto_batch.change( toggle_batch_slider, inputs=[auto_batch], outputs=[manual_batch] ) evaluate_btn.click( process_images, inputs=[input_files, model_selection, auto_batch, manual_batch], outputs=[status_text, results_table, results_state] ) model_selection.change( update_results_table, inputs=[model_selection, results_state], outputs=[results_table] ) export_csv.click( export_results, inputs=[results_state], outputs=[download_file] ) clear_btn.click( clear_all, outputs=[status_text, results_table, results_state, download_file] ) # Initial setup app.load(lambda: "🔄 Ready for evaluation - Upload images to get started!") return app if __name__ == "__main__": app = create_interface() app.queue(max_size=10).launch( server_name="0.0.0.0", server_port=7860, share=False, show_error=True )