import os import asyncio from typing import List, Dict, Optional, Tuple, Any from dataclasses import dataclass, field from pathlib import Path import logging import cv2 import numpy as np import torch import onnxruntime as rt from PIL import Image import gradio as gr from transformers import pipeline from huggingface_hub import hf_hub_download import pandas as pd # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Import aesthetic predictor function # Ensure 'aesthetic_predictor_v2_5.py' is in the same directory or accessible in PYTHONPATH # from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip # Placeholder for the import if the file is missing, to allow syntax checking def convert_v2_5_from_siglip(low_cpu_mem_usage=True, trust_remote_code=True): # This is a placeholder. Replace with actual import and ensure the function exists. logger.warning("Using placeholder for convert_v2_5_from_siglip. Ensure the actual implementation is available.") # Mocking a model and preprocessor structure mock_model = torch.nn.Sequential(torch.nn.Linear(10,1)) # Dummy model mock_preprocessor = lambda images, return_tensors: {"pixel_values": torch.randn(len(images), 3, 224, 224)} # Dummy preprocessor return mock_model, mock_preprocessor @dataclass class EvaluationResult: """Data class for storing image evaluation results""" file_name: str image_path: str scores: Dict[str, Optional[float]] = field(default_factory=dict) final_score: Optional[float] = None def calculate_final_score(self, selected_models: List[str]) -> None: """Calculate the average score from selected models""" valid_scores = [ score for model, score in self.scores.items() if model in selected_models and score is not None ] self.final_score = np.mean(valid_scores) if valid_scores else None class BaseModel: """Base class for all evaluation models""" def __init__(self, name: str): self.name = name self.device = 'cuda' if torch.cuda.is_available() else 'cpu' async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: """Evaluate a batch of images""" raise NotImplementedError class AestheticShadowModel(BaseModel): """Aesthetic Shadow V2 model implementation""" def __init__(self): super().__init__("Aesthetic Shadow") logger.info(f"Loading {self.name} model...") self.model = pipeline( "image-classification", model="NeoChen1024/aesthetic-shadow-v2-backup", device=0 if self.device == 'cuda' else -1 ) async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: try: results = self.model(images) scores = [] for result_set in results: # self.model(images) returns a list of lists of dicts if multiple images if not isinstance(result_set, list): # If single image, it returns a list of dicts result_set = [result_set] # Correctly handle varying structures from the pipeline hq_score = 0 # The pipeline might return a list of dicts for each image, or just a list of dicts for a single image # For multiple images, results is List[List[Dict]] # For a single image, results is List[Dict] - pipeline might batch internally # The provided code expects `results` to be a list of predictions, where each prediction is a list of class scores. current_image_predictions = result_set if isinstance(result_set, list) and len(result_set) > 0 and isinstance(result_set[0], list) and len(images) == 1: # Handle cases where pipeline wraps single image result in an extra list current_image_predictions = result_set[0] hq_score_found = next((p['score'] for p in current_image_predictions if p['label'] == 'hq'), 0) scores.append(float(np.clip(hq_score_found * 10.0, 0.0, 10.0))) return scores except Exception as e: logger.error(f"Error in {self.name}: {e}") return [None] * len(images) class WaifuScorerModel(BaseModel): """Waifu Scorer V3 model implementation""" def __init__(self): super().__init__("Waifu Scorer") logger.info(f"Loading {self.name} model...") self._load_model() def _load_model(self): try: import clip self.mlp = self._create_mlp() model_path = hf_hub_download("Eugeoter/waifu-scorer-v3", "model.pth") state_dict = torch.load(model_path, map_location=self.device) # --- FIX for state_dict key mismatch --- # Check if keys are prefixed (e.g., "layers.0.weight") and adjust if any(key.startswith("layers.") for key in state_dict.keys()): new_state_dict = {} for k, v in state_dict.items(): if k.startswith("layers."): new_state_dict[k[len("layers."):]] = v else: # Keep other keys if any (though error suggests all relevant keys were prefixed) new_state_dict[k] = v state_dict = new_state_dict # --- END FIX --- self.mlp.load_state_dict(state_dict) self.mlp.to(self.device).eval() self.clip_model, self.preprocess = clip.load("ViT-L/14", device=self.device) self.available = True except ImportError: logger.error(f"Failed to load {self.name}: PyPI package 'clip' (openai-clip) not found. Please install it.") self.available = False except Exception as e: logger.error(f"Failed to load {self.name}: {e}") self.available = False def _create_mlp(self) -> torch.nn.Module: """Create the MLP architecture""" return torch.nn.Sequential( torch.nn.Linear(768, 2048), torch.nn.ReLU(), torch.nn.BatchNorm1d(2048), torch.nn.Dropout(0.3), torch.nn.Linear(2048, 512), torch.nn.ReLU(), torch.nn.BatchNorm1d(512), torch.nn.Dropout(0.3), torch.nn.Linear(512, 256), torch.nn.ReLU(), torch.nn.BatchNorm1d(256), torch.nn.Dropout(0.2), torch.nn.Linear(256, 128), torch.nn.ReLU(), torch.nn.BatchNorm1d(128), torch.nn.Dropout(0.1), torch.nn.Linear(128, 32), torch.nn.ReLU(), torch.nn.Linear(32, 1) ) @torch.no_grad() async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: if not self.available: return [None] * len(images) try: image_tensors = torch.cat([self.preprocess(img).unsqueeze(0) for img in images]) image_tensors = image_tensors.to(self.device) features = self.clip_model.encode_image(image_tensors) features = features.float() # Ensure features are float32 for MLP features = features / features.norm(dim=-1, keepdim=True) predictions = self.mlp(features) scores = predictions.clamp(0, 10).cpu().numpy().flatten().tolist() return scores except Exception as e: logger.error(f"Error in {self.name}: {e}") return [None] * len(images) class AestheticPredictorV25Model(BaseModel): """Aesthetic Predictor V2.5 model implementation""" def __init__(self): super().__init__("Aesthetic V2.5") logger.info(f"Loading {self.name} model...") try: self.model, self.preprocessor = convert_v2_5_from_siglip( low_cpu_mem_usage=True, trust_remote_code=True, # Be cautious with trust_remote_code=True ) if self.device == 'cuda': self.model = self.model.to(torch.bfloat16).cuda() self.available = True except Exception as e: logger.error(f"Failed to load {self.name}: {e}. Ensure 'aesthetic_predictor_v2_5.py' is correct and dependencies are installed.") self.available = False self.model, self.preprocessor = None, None @torch.no_grad() async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: if not self.available: return [None] * len(images) try: images_rgb = [img.convert("RGB") for img in images] pixel_values = self.preprocessor(images=images_rgb, return_tensors="pt")["pixel_values"] # Access pixel_values key if self.device == 'cuda': pixel_values = pixel_values.to(torch.bfloat16).cuda() else: pixel_values = pixel_values.float() # Ensure correct dtype for CPU logits = self.model(pixel_values).logits # Get logits if model output is a dataclass/dict # If model directly returns logits tensor: # logits = self.model(pixel_values) scores = logits.squeeze().float().cpu().numpy() if scores.ndim == 0: # Handle single image case scores = np.array([scores.item()]) # Use .item() for scalar tensor return [float(np.clip(s, 0.0, 10.0)) for s in scores] except Exception as e: logger.error(f"Error in {self.name}: {e}") return [None] * len(images) class AnimeAestheticModel(BaseModel): """Anime Aesthetic model implementation""" def __init__(self): super().__init__("Anime Score") logger.info(f"Loading {self.name} model...") try: model_path = hf_hub_download(repo_id="skytnt/anime-aesthetic", filename="model.onnx") self.session = rt.InferenceSession(model_path, providers=['CPUExecutionProvider']) self.available = True except Exception as e: logger.error(f"Failed to load {self.name}: {e}") self.available = False self.session = None async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: if not self.available: return [None] * len(images) scores = [] for img in images: try: score = self._process_single_image(img) scores.append(float(np.clip(score * 10.0, 0.0, 10.0))) except Exception as e: logger.error(f"Error in {self.name} for single image processing: {e}") scores.append(None) return scores def _process_single_image(self, img: Image.Image) -> float: """Process a single image through the model""" # Ensure image is RGB img_rgb = img.convert("RGB") img_np = np.array(img_rgb).astype(np.float32) / 255.0 # Original model expects BGR, but most image ops are RGB. # If ONNX model was trained on BGR, conversion might be needed. # Assuming model takes RGB based on common practices unless specified. # If it expects BGR: img_np = cv2.cvtColor(np.array(img.convert("RGB")), cv2.COLOR_RGB2BGR).astype(np.float32) / 255.0 size = 224 # Typical size for many aesthetic models, 768 is very large for direct input. # The original notebook for skytnt/anime-aesthetic uses 224x224. # Let's assume 224 unless documentation says 768. # The error log doesn't specify input size issues, but 768 is unusually large for this type of ONNX model. # Sticking to original code's 768 for now, but this is a potential point of error if model expects 224. h, w = img_np.shape[:2] if h > w: new_h, new_w = size, int(size * w / h) else: new_h, new_w = int(size * h / w), size resized_img = cv2.resize(img_np, (new_w, new_h), interpolation=cv2.INTER_AREA) # Use INTER_AREA for shrinking canvas = np.ones((size, size, 3), dtype=np.float32) * 0.5 # Pad with gray, or use black (0) pad_h = (size - new_h) // 2 pad_w = (size - new_w) // 2 canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w, :] = resized_img input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :].astype(np.float32) return self.session.run(None, {"img": input_tensor})[0].item() class ImageEvaluator: """Main class for managing image evaluation""" def __init__(self): self.models: Dict[str, BaseModel] = {} self._initialize_models() self.results: List[EvaluationResult] = [] def _initialize_models(self): """Initialize all evaluation models""" model_classes = [ ("aesthetic_shadow", AestheticShadowModel), ("waifu_scorer", WaifuScorerModel), ("aesthetic_predictor_v2_5", AestheticPredictorV25Model), ("anime_aesthetic", AnimeAestheticModel), ] for key, model_class in model_classes: try: model_instance = model_class() # Store only if model is available (loaded successfully) if hasattr(model_instance, 'available') and model_instance.available: self.models[key] = model_instance logger.info(f"Successfully loaded and initialized {model_instance.name} ({key})") elif not hasattr(model_instance, 'available'): # For models without explicit 'available' flag self.models[key] = model_instance logger.info(f"Successfully loaded and initialized {model_instance.name} ({key}) (availability not explicitly tracked)") else: logger.warning(f"{model_instance.name} ({key}) was not loaded successfully and will be skipped.") except Exception as e: logger.error(f"Failed to initialize {key}: {e}") async def evaluate_images( self, file_paths: List[str], selected_models: List[str], batch_size: int = 8, progress_callback = None ) -> Tuple[List[EvaluationResult], List[str]]: """Evaluate images with selected models""" logs = [] current_results = [] # Use a local list for current evaluation images_data = [] # Store tuples of (image, original_path) for path_obj in file_paths: # file_paths are UploadFile objects from Gradio path = path_obj.name # .name gives the temporary file path try: img = Image.open(path).convert("RGB") images_data.append({"image": img, "path": path, "name": Path(path).name}) except Exception as e: logs.append(f"Failed to load {Path(path).name}: {e}") if not images_data: logs.append("No valid images to process") return current_results, logs logs.append(f"Loaded {len(images_data)} images") # Filter selected_models to only include those that were successfully initialized active_selected_models = [m_key for m_key in selected_models if m_key in self.models] if len(active_selected_models) != len(selected_models): disabled_models = set(selected_models) - set(active_selected_models) logs.append(f"Warning: The following models were selected but are not available: {', '.join(disabled_models)}") # Initialize results for all images first for data in images_data: result = EvaluationResult( file_name=data["name"], image_path=data["path"] # Store original path for display if needed ) current_results.append(result) total_images = len(images_data) processed_count = 0 for model_key in active_selected_models: model_instance = self.models[model_key] logs.append(f"Processing with {model_instance.name}...") for i in range(0, total_images, batch_size): batch_data = images_data[i:i + batch_size] batch_images_pil = [d["image"] for d in batch_data] try: scores = await model_instance.evaluate_batch(batch_images_pil) for k, score in enumerate(scores): # Find the corresponding EvaluationResult object # This assumes current_results is ordered the same as images_data current_results[i+k].scores[model_key] = score except Exception as e: logger.error(f"Error evaluating batch with {model_instance.name}: {e}") for k in range(len(batch_images_pil)): current_results[i+k].scores[model_key] = None processed_count += len(batch_images_pil) if progress_callback: # Progress based on overall images processed per model, then average over models # This logic might need refinement for a smoother progress bar experience current_model_idx = active_selected_models.index(model_key) overall_progress = ((current_model_idx / len(active_selected_models)) + \ ((i + len(batch_data)) / total_images) / len(active_selected_models)) * 100 progress_callback(min(overall_progress, 100), f"Model: {model_instance.name}, Batch {i//batch_size + 1}") # Calculate final scores for all results for result in current_results: result.calculate_final_score(active_selected_models) logs.append("Evaluation complete.") self.results = current_results # Update the main results list return current_results, logs def get_results_dataframe(self, selected_models_keys: List[str]) -> pd.DataFrame: if not self.results: return pd.DataFrame() data = [] # Ensure selected_models_keys only contains keys of successfully loaded models valid_selected_models_keys = [key for key in selected_models_keys if key in self.models] for result in self.results: row = { 'File Name': result.file_name, # For Gradio display, we might want to show the image itself # 'Image': result.image_path, # This will show the temp path 'Image': gr.Image(result.image_path, type="pil", height=100, width=100) # Display thumbnail } for model_key in valid_selected_models_keys: model_name = self.models[model_key].name score = result.scores.get(model_key) row[model_name] = f"{score:.4f}" if score is not None else "N/A" row['Final Score'] = f"{result.final_score:.4f}" if result.final_score is not None else "N/A" data.append(row) # Define column order column_order = ['File Name', 'Image'] + \ [self.models[key].name for key in valid_selected_models_keys if key in self.models] + \ ['Final Score'] df = pd.DataFrame(data) if not df.empty: df = df[column_order] # Reorder columns return df def create_interface(): """Create the Gradio interface""" evaluator = ImageEvaluator() model_options = [ (model.name, key) for key, model in evaluator.models.items() ] # If some models failed to load, model_options will be shorter. # Provide default selected models based on successfully loaded ones. default_selected_model_labels = [name for name, key in model_options] with gr.Blocks(theme=gr.themes.Soft(), title="Image Evaluation Tool") as demo: # NOTE on Gradio TypeError: # The traceback "TypeError: argument of type 'bool' is not iterable" during Gradio startup # (specifically in `gradio_client/utils.py` while processing component schemas) # often indicates an incompatibility with the Gradio version being used or a bug # in how Gradio generates schemas for certain component configurations. # The most common recommendation is to: # 1. Ensure your Gradio library is up-to-date: `pip install --upgrade gradio` # 2. If the error persists, try simplifying complex component configurations or # testing with a known stable version of Gradio. # The code below follows standard Gradio practices, so the error is likely # environment-related if it persists after the WaifuScorer fix. gr.Markdown(""" # 🎨 Advanced Image Evaluation Tool Evaluate images using state-of-the-art aesthetic and quality prediction models. Upload your images and select the models you want to use for evaluation. """) with gr.Row(): with gr.Column(scale=1): input_files = gr.File( label="Upload Images", file_count="multiple", file_types=["image"] ) model_checkboxes = gr.CheckboxGroup( choices=[label for label, _ in model_options], # Use labels for choices value=default_selected_model_labels, # Default to all loaded models label="Select Models", info="Choose which models to use for evaluation. Models that failed to load will not be available." ) batch_size_slider = gr.Slider( # Renamed to avoid conflict with batch_size variable name minimum=1, maximum=32, # Max 64 might be too high for some GPUs value=8, step=1, label="Batch Size", info="Number of images to process at once per model." ) with gr.Row(): evaluate_btn = gr.Button("🚀 Evaluate Images", variant="primary", scale=2) clear_btn = gr.Button("🗑️ Clear", variant="secondary", scale=1) with gr.Column(scale=3): # Increased scale for results # Using gr.Textbox for logs, as gr.Progress is now a status tracker logs_display = gr.Textbox( label="Processing Logs", lines=10, max_lines=20, # Allow more lines autoscroll=True, interactive=False ) # Using gr.Label for progress status updates progress_status = gr.Label(label="Progress") results_df_display = gr.Dataframe( label="Evaluation Results", interactive=False, wrap=True, # Define column types for better display, especially for images # headers=['File Name', 'Image'] + default_selected_model_labels + ['Final Score'], # col_count=(len(default_selected_model_labels) + 3, "fixed"), # datatype=['str', 'image'] + ['number'] * (len(default_selected_model_labels) + 1) ) download_button = gr.Button("📥 Download Results (CSV)", variant="secondary") # Changed from gr.Button to potentially use gr.DownloadButton later # download_file_output = gr.File(label="Download CSV", visible=False, interactive=False) # Using gr.File for download output triggered by a regular button download_file_output_component = gr.File(label="Download", visible=False) # State for storing full EvaluationResult objects if needed for more complex interactions # For this setup, regenerating DataFrame from evaluator.results is generally sufficient # results_state = gr.State([]) # If storing raw results is complex, simplify or manage carefully async def run_evaluation(files, selected_model_labels, current_batch_size, progress=gr.Progress(track_tqdm=True)): if not files: return "Please upload images first.", pd.DataFrame(), [], "No files uploaded." # Convert display labels back to model keys selected_model_keys = [key for label, key in model_options if label in selected_model_labels] if not selected_model_keys: return "Please select at least one model.", pd.DataFrame(), [], "No models selected." # file_paths = [f.name for f in files] # .name gives temp path of UploadFile # Progress callback # def update_progress_display(value, desc="Processing..."): # progress(value / 100, desc=f"{desc} {value:.0f}%") # return f"{desc} {value:.0f}%" # For gr.Label # Use gr.Progress context for automatic updates with iterators # However, for manual updates with batching, direct calls are fine. # We'll update logs_display and progress_status manually. progress_updates = [] def progress_callback_for_eval(p_value, p_desc): progress(p_value / 100, desc=p_desc) # Update the main progress component # logs_display.value += f"\n{p_desc} - {p_value:.0f}%" # This will make logs messy progress_updates.append(f"{p_desc} - {p_value:.0f}%") # Evaluate images processed_results, log_messages = await evaluator.evaluate_images( files, # Pass the list of UploadFile objects directly selected_model_keys, int(current_batch_size), # Ensure batch_size is int progress_callback_for_eval # Pass the callback ) df = evaluator.get_results_dataframe(selected_model_keys) log_text = "\n".join(log_messages + progress_updates) final_status = "Evaluation complete." if processed_results else "Evaluation failed or no results." progress(1.0, desc=final_status) # Mark progress as complete return log_text, df, final_status # Removed results_state for simplicity def handle_model_selection_change(selected_model_labels_updated): # Called when checkbox group changes. evaluator.results should already be populated. if not evaluator.results: return pd.DataFrame() # No results to re-filter/re-calculate selected_model_keys_updated = [key for label, key in model_options if label in selected_model_labels_updated] # Recalculate final scores for all existing results based on new selection for res_obj in evaluator.results: res_obj.calculate_final_score(selected_model_keys_updated) return evaluator.get_results_dataframe(selected_model_keys_updated) def clear_all_outputs(): evaluator.results = [] # Clear stored results in the evaluator return "", pd.DataFrame(), "Cleared.", None # Log, DataFrame, Progress Status, Download File def generate_csv_for_download(selected_model_labels_for_csv): if not evaluator.results: gr.Warning("No results to download.") return None selected_model_keys_for_csv = [key for label, key in model_options if label in selected_model_labels_for_csv] # Get DataFrame, but exclude the gr.Image column for CSV df_for_csv = evaluator.get_results_dataframe(selected_model_keys_for_csv).copy() if 'Image' in df_for_csv.columns: df_for_csv.drop(columns=['Image'], inplace=True) if df_for_csv.empty: gr.Warning("No data to download based on current selection.") return None import tempfile with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as tmp_file: df_for_csv.to_csv(tmp_file.name, index=False) return tmp_file.name evaluate_btn.click( fn=run_evaluation, inputs=[input_files, model_checkboxes, batch_size_slider], outputs=[logs_display, results_df_display, progress_status] # Removed results_state ) model_checkboxes.change( fn=handle_model_selection_change, inputs=[model_checkboxes], outputs=[results_df_display] ) clear_btn.click( fn=clear_all_outputs, outputs=[logs_display, results_df_display, progress_status, download_file_output_component] ) download_button.click( fn=generate_csv_for_download, inputs=[model_checkboxes], outputs=[download_file_output_component] ) gr.Markdown(""" ### 📝 Notes - **Model Selection**: Choose which models to use for evaluation. The final score is the average of the selected models. Models that failed to load during startup will not be listed or will be ignored. - **Batch Size**: Adjust based on your system's VRAM and RAM. Smaller batches use less memory but may be slower overall. - **Results Table**: Displays scores from selected models and the final average. Images are shown as thumbnails. - **Download**: Export results (excluding image thumbnails) as a CSV file for further analysis. ### 🎯 Score Interpretation (General Guide) - **7-10**: High quality/aesthetic appeal - **5-7**: Medium quality - **0-5**: Lower quality _(Note: Score ranges and interpretations can vary between models.)_ """) return demo if __name__ == "__main__": # Ensure 'aesthetic_predictor_v2_5.py' exists and 'openai-clip' is installed for WaifuScorer # Example: pip install openai-clip transformers==4.30.2 onnxruntime gradio pandas Pillow opencv-python # Check specific model requirements. # Create and launch the interface app_interface = create_interface() # Adding .queue() is good for handling multiple users or long-running tasks. # Set debug=True for more detailed Gradio errors during development. app_interface.queue().launch(debug=True)