CIET / app.py
VOIDER's picture
Update app.py
027d32e verified
raw
history blame
29.7 kB
"""
Modern Image Evaluation Tool with Aesthetic and Quality Prediction Models
This refactored version features:
- Modern async/await patterns with proper error handling
- Type hints throughout for better code maintainability
- Dependency injection and factory patterns
- Proper resource management with context managers
- Configuration-driven model loading
- Improved batch processing with memory optimization
- Clean separation of concerns with proper abstraction layers
"""
import asyncio
import base64
import csv
import logging
import os
import tempfile
import shutil
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from enum import Enum
from io import BytesIO, StringIO
from pathlib import Path
from typing import Dict, List, Optional, Protocol, Tuple, Union, Any
from abc import ABC, abstractmethod
import cv2
import gradio as gr
import numpy as np
import onnxruntime as ort
import torch
import torch.nn as nn
from PIL import Image
from transformers import pipeline
from huggingface_hub import hf_hub_download
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# =============================================================================
# Configuration and Data Models
# =============================================================================
class ModelType(Enum):
"""Enumeration of available model types."""
AESTHETIC_SHADOW = "aesthetic_shadow"
WAIFU_SCORER = "waifu_scorer"
AESTHETIC_PREDICTOR_V2_5 = "aesthetic_predictor_v2_5"
ANIME_AESTHETIC = "anime_aesthetic"
@dataclass
class ModelConfig:
"""Configuration for individual models."""
name: str
display_name: str
enabled: bool = True
batch_supported: bool = True
model_path: Optional[str] = None
cache_dir: Optional[str] = None
@dataclass
class ProcessingConfig:
"""Configuration for processing parameters."""
auto_batch: bool = False
manual_batch_size: int = 1
max_batch_size: int = 64
device: str = "cuda" if torch.cuda.is_available() else "cpu"
score_range: Tuple[float, float] = (0.0, 10.0)
@dataclass
class EvaluationResult:
"""Data class for individual evaluation results."""
file_name: str
file_path: str
thumbnail_b64: str
model_scores: Dict[str, Optional[float]] = field(default_factory=dict)
final_score: Optional[float] = None
processing_time: float = 0.0
error: Optional[str] = None
@dataclass
class BatchResult:
"""Data class for batch processing results."""
results: List[EvaluationResult]
logs: List[str]
processing_time: float
batch_size_used: int
success_count: int
error_count: int
# =============================================================================
# Model Interfaces and Implementations
# =============================================================================
class BaseModel(Protocol):
"""Protocol defining the interface for all evaluation models."""
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Predict scores for a batch of images."""
...
def is_available(self) -> bool:
"""Check if the model is available and ready for inference."""
...
def cleanup(self) -> None:
"""Clean up model resources."""
...
class ModernMLP(nn.Module):
"""Modern implementation of MLP with improved architecture."""
def __init__(
self,
input_size: int,
hidden_dims: List[int] = None,
dropout_rates: List[float] = None,
use_batch_norm: bool = True,
activation: nn.Module = nn.ReLU
):
super().__init__()
if hidden_dims is None:
hidden_dims = [2048, 512, 256, 128, 32]
if dropout_rates is None:
dropout_rates = [0.3, 0.3, 0.2, 0.1, 0.0]
layers = []
prev_dim = input_size
for i, (hidden_dim, dropout_rate) in enumerate(zip(hidden_dims, dropout_rates)):
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(activation())
if use_batch_norm and i < len(hidden_dims) - 1:
layers.append(nn.BatchNorm1d(hidden_dim))
if dropout_rate > 0:
layers.append(nn.Dropout(dropout_rate))
prev_dim = hidden_dim
# Final output layer
layers.append(nn.Linear(prev_dim, 1))
self.network = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.network(x)
class WaifuScorerModel:
"""Modernized WaifuScorer implementation with better error handling."""
def __init__(self, config: ModelConfig, device: str):
self.config = config
self.device = device
self.dtype = torch.float32
self._available = False
self._model = None
self._clip_model = None
self._preprocess = None
self._initialize_model()
def _initialize_model(self) -> None:
"""Initialize the model with proper error handling."""
try:
import clip
# Download model if needed
model_path = self._get_model_path()
# Initialize MLP
self._model = ModernMLP(input_size=768)
# Load weights
if model_path.endswith(".safetensors"):
from safetensors.torch import load_file
state_dict = load_file(model_path)
else:
state_dict = torch.load(model_path, map_location=self.device)
self._model.load_state_dict(state_dict)
self._model.to(self.device)
self._model.eval()
# Load CLIP model
self._clip_model, self._preprocess = clip.load("ViT-L/14", device=self.device)
self._available = True
logger.info(f"WaifuScorer model loaded successfully on {self.device}")
except Exception as e:
logger.error(f"Failed to initialize WaifuScorer: {e}")
self._available = False
def _get_model_path(self) -> str:
"""Get or download the model path."""
if self.config.model_path and os.path.isfile(self.config.model_path):
return self.config.model_path
# Default download path
model_path = "Eugeoter/waifu-scorer-v3/model.pth"
username, repo_id, model_name = model_path.split("/")[-3:]
return hf_hub_download(f"{username}/{repo_id}", model_name, cache_dir=self.config.cache_dir)
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Predict scores for a batch of images."""
if not self._available:
return [None] * len(images)
try:
# Handle single image case for CLIP compatibility
batch_images = images * 2 if len(images) == 1 else images
# Preprocess images
image_tensors = [self._preprocess(img).unsqueeze(0) for img in batch_images]
image_batch = torch.cat(image_tensors).to(self.device)
# Extract features and predict
with torch.no_grad():
image_features = self._clip_model.encode_image(image_batch)
# Normalize features
norm = image_features.norm(2, dim=-1, keepdim=True)
norm[norm == 0] = 1
normalized_features = (image_features / norm).to(device=self.device, dtype=self.dtype)
predictions = self._model(normalized_features)
scores = predictions.clamp(0, 10).cpu().numpy().reshape(-1).tolist()
return scores[:len(images)]
except Exception as e:
logger.error(f"Error in WaifuScorer prediction: {e}")
return [None] * len(images)
def is_available(self) -> bool:
return self._available
def cleanup(self) -> None:
"""Clean up model resources."""
if self._model is not None:
del self._model
if self._clip_model is not None:
del self._clip_model
torch.cuda.empty_cache() if torch.cuda.is_available() else None
class AestheticShadowModel:
"""Wrapper for Aesthetic Shadow model using transformers pipeline."""
def __init__(self, config: ModelConfig, device: str):
self.config = config
self.device = device
self._available = False
self._model = None
self._initialize_model()
def _initialize_model(self) -> None:
"""Initialize the model pipeline."""
try:
self._model = pipeline(
"image-classification",
model="NeoChen1024/aesthetic-shadow-v2-backup",
device=self.device
)
self._available = True
logger.info("Aesthetic Shadow model loaded successfully")
except Exception as e:
logger.error(f"Failed to initialize Aesthetic Shadow: {e}")
self._available = False
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Predict scores for a batch of images."""
if not self._available:
return [None] * len(images)
try:
results = self._model(images)
scores = []
for result in results:
try:
hq_score = next(p for p in result if p['label'] == 'hq')['score']
score = float(np.clip(hq_score * 10.0, 0.0, 10.0))
scores.append(score)
except (StopIteration, KeyError, TypeError):
scores.append(None)
return scores
except Exception as e:
logger.error(f"Error in Aesthetic Shadow prediction: {e}")
return [None] * len(images)
def is_available(self) -> bool:
return self._available
def cleanup(self) -> None:
if self._model is not None:
del self._model
class AestheticPredictorV25Model:
"""Wrapper for Aesthetic Predictor V2.5 model."""
def __init__(self, config: ModelConfig, device: str):
self.config = config
self.device = device
self._available = False
self._model = None
self._preprocessor = None
self._initialize_model()
def _initialize_model(self) -> None:
"""Initialize the model."""
try:
from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip
self._model, self._preprocessor = convert_v2_5_from_siglip(
low_cpu_mem_usage=True,
trust_remote_code=True,
)
if torch.cuda.is_available():
self._model = self._model.to(torch.bfloat16).cuda()
self._available = True
logger.info("Aesthetic Predictor V2.5 loaded successfully")
except Exception as e:
logger.error(f"Failed to initialize Aesthetic Predictor V2.5: {e}")
self._available = False
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Predict scores for a batch of images."""
if not self._available:
return [None] * len(images)
try:
rgb_images = [img.convert("RGB") for img in images]
pixel_values = self._preprocessor(images=rgb_images, return_tensors="pt").pixel_values
if torch.cuda.is_available():
pixel_values = pixel_values.to(torch.bfloat16).cuda()
with torch.inference_mode():
scores = self._model(pixel_values).logits.squeeze().float().cpu().numpy()
if scores.ndim == 0:
scores = np.array([scores])
return [float(np.round(np.clip(s, 0.0, 10.0), 4)) for s in scores]
except Exception as e:
logger.error(f"Error in Aesthetic Predictor V2.5 prediction: {e}")
return [None] * len(images)
def is_available(self) -> bool:
return self._available
def cleanup(self) -> None:
if self._model is not None:
del self._model
class AnimeAestheticModel:
"""ONNX-based Anime Aesthetic model."""
def __init__(self, config: ModelConfig, device: str):
self.config = config
self.device = device
self._available = False
self._session = None
self._initialize_model()
def _initialize_model(self) -> None:
"""Initialize the ONNX model."""
try:
model_path = hf_hub_download(repo_id="skytnt/anime-aesthetic", filename="model.onnx")
self._session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider'])
self._available = True
logger.info("Anime Aesthetic model loaded successfully")
except Exception as e:
logger.error(f"Failed to initialize Anime Aesthetic: {e}")
self._available = False
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Predict scores for images (single image processing for ONNX)."""
if not self._available:
return [None] * len(images)
scores = []
for img in images:
try:
score = self._predict_single(img)
scores.append(float(np.clip(score * 10.0, 0.0, 10.0)))
except Exception as e:
logger.error(f"Error predicting anime aesthetic for image: {e}")
scores.append(None)
return scores
def _predict_single(self, img: Image.Image) -> float:
"""Predict score for a single image."""
img_np = np.array(img).astype(np.float32) / 255.0
s = 768
h, w = img_np.shape[:2]
# Resize while maintaining aspect ratio
if h > w:
new_h, new_w = s, int(s * w / h)
else:
new_h, new_w = int(s * h / w), s
resized = cv2.resize(img_np, (new_w, new_h))
# Center crop/pad to square
canvas = np.zeros((s, s, 3), dtype=np.float32)
pad_h = (s - new_h) // 2
pad_w = (s - new_w) // 2
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
# Prepare input
input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :]
return self._session.run(None, {"img": input_tensor})[0].item()
def is_available(self) -> bool:
return self._available
def cleanup(self) -> None:
if self._session is not None:
del self._session
# =============================================================================
# Model Factory and Manager
# =============================================================================
class ModelFactory:
"""Factory for creating model instances."""
_MODEL_CLASSES = {
ModelType.AESTHETIC_SHADOW: AestheticShadowModel,
ModelType.WAIFU_SCORER: WaifuScorerModel,
ModelType.AESTHETIC_PREDICTOR_V2_5: AestheticPredictorV25Model,
ModelType.ANIME_AESTHETIC: AnimeAestheticModel,
}
@classmethod
def create_model(cls, model_type: ModelType, config: ModelConfig, device: str) -> BaseModel:
"""Create a model instance based on type."""
model_class = cls._MODEL_CLASSES.get(model_type)
if not model_class:
raise ValueError(f"Unknown model type: {model_type}")
return model_class(config, device)
class ModelManager:
"""Advanced model manager with async processing and resource management."""
def __init__(self, processing_config: ProcessingConfig):
self.config = processing_config
self.models: Dict[ModelType, BaseModel] = {}
self.model_configs = self._create_default_configs()
self._processing_queue = asyncio.Queue()
self._worker_task: Optional[asyncio.Task] = None
self._temp_dir = Path(tempfile.mkdtemp())
self._initialize_models()
def _create_default_configs(self) -> Dict[ModelType, ModelConfig]:
"""Create default model configurations."""
return {
ModelType.AESTHETIC_SHADOW: ModelConfig(
name="aesthetic_shadow",
display_name="Aesthetic Shadow"
),
ModelType.WAIFU_SCORER: ModelConfig(
name="waifu_scorer",
display_name="Waifu Scorer"
),
ModelType.AESTHETIC_PREDICTOR_V2_5: ModelConfig(
name="aesthetic_predictor_v2_5",
display_name="Aesthetic V2.5"
),
ModelType.ANIME_AESTHETIC: ModelConfig(
name="anime_aesthetic",
display_name="Anime Score",
batch_supported=False
),
}
def _initialize_models(self) -> None:
"""Initialize all models."""
logger.info("Initializing models...")
for model_type, config in self.model_configs.items():
if config.enabled:
try:
model = ModelFactory.create_model(model_type, config, self.config.device)
if model.is_available():
self.models[model_type] = model
logger.info(f"✓ {config.display_name} loaded successfully")
else:
logger.warning(f"✗ {config.display_name} failed to load")
except Exception as e:
logger.error(f"✗ {config.display_name} initialization error: {e}")
logger.info(f"Initialized {len(self.models)} models successfully")
async def start_worker(self) -> None:
"""Start the background processing worker."""
if self._worker_task is None:
self._worker_task = asyncio.create_task(self._worker_loop())
logger.info("Background worker started")
async def _worker_loop(self) -> None:
"""Main worker loop for processing requests."""
while True:
request = await self._processing_queue.get()
if request is None: # Shutdown signal
break
try:
result = await self._process_request(request)
request['future'].set_result(result)
except Exception as e:
request['future'].set_exception(e)
finally:
self._processing_queue.task_done()
async def process_images(
self,
file_paths: List[str],
selected_models: List[ModelType],
auto_batch: bool = False,
manual_batch_size: int = 1
) -> BatchResult:
"""Process images with selected models."""
future = asyncio.Future()
request = {
'file_paths': file_paths,
'selected_models': selected_models,
'auto_batch': auto_batch,
'manual_batch_size': manual_batch_size,
'future': future
}
await self._processing_queue.put(request)
return await future
async def _process_request(self, request: Dict) -> BatchResult:
"""Process a single batch request."""
start_time = asyncio.get_event_loop().time()
logs = []
results = []
file_paths = request['file_paths']
selected_models = request['selected_models']
auto_batch = request['auto_batch']
manual_batch_size = request['manual_batch_size']
# Load images
images, valid_paths = await self._load_images(file_paths, logs)
if not images:
return BatchResult([], logs, 0.0, 0, 0, len(file_paths))
# Determine batch size
batch_size = await self._determine_batch_size(images, auto_batch, manual_batch_size, logs)
# Process in batches
for i in range(0, len(images), batch_size):
batch_images = images[i:i+batch_size]
batch_paths = valid_paths[i:i+batch_size]
batch_results = await self._process_batch(batch_images, batch_paths, selected_models, logs)
results.extend(batch_results)
processing_time = asyncio.get_event_loop().time() - start_time
success_count = sum(1 for r in results if r.error is None)
error_count = len(results) - success_count
return BatchResult(
results=results,
logs=logs,
processing_time=processing_time,
batch_size_used=batch_size,
success_count=success_count,
error_count=error_count
)
async def _load_images(self, file_paths: List[str], logs: List[str]) -> Tuple[List[Image.Image], List[str]]:
"""Load and validate images."""
images = []
valid_paths = []
logs.append(f"Loading {len(file_paths)} images...")
for path in file_paths:
try:
img = Image.open(path).convert("RGB")
images.append(img)
valid_paths.append(path)
except Exception as e:
logs.append(f"Failed to load {path}: {e}")
logs.append(f"Successfully loaded {len(images)} images")
return images, valid_paths
async def _determine_batch_size(
self,
images: List[Image.Image],
auto_batch: bool,
manual_batch_size: int,
logs: List[str]
) -> int:
"""Determine optimal batch size."""
if not auto_batch:
return min(manual_batch_size, len(images))
# Auto-tune batch size
batch_size = 1
test_image = images[0:1]
while batch_size <= min(len(images), self.config.max_batch_size):
try:
# Test with a sample of available models
test_batch = test_image * batch_size
for model_type, model in list(self.models.items())[:2]: # Test with first 2 models
await model.predict(test_batch)
batch_size *= 2
except Exception:
break
optimal_batch = max(1, batch_size // 2)
logs.append(f"Auto-tuned batch size: {optimal_batch}")
return optimal_batch
async def _process_batch(
self,
images: List[Image.Image],
paths: List[str],
selected_models: List[ModelType],
logs: List[str]
) -> List[EvaluationResult]:
"""Process a single batch of images."""
batch_results = []
# Get predictions from all models
model_predictions = {}
for model_type in selected_models:
if model_type in self.models:
try:
predictions = await self.models[model_type].predict(images)
model_predictions[model_type.value] = predictions
logs.append(f"✓ {self.model_configs[model_type].display_name} processed batch")
except Exception as e:
logs.append(f"✗ {self.model_configs[model_type].display_name} error: {e}")
model_predictions[model_type.value] = [None] * len(images)
# Create results
for i, (image, path) in enumerate(zip(images, paths)):
# Collect scores for this image
scores = {}
valid_scores = []
for model_type in selected_models:
score = model_predictions.get(model_type.value, [None] * len(images))[i]
scores[model_type.value] = score
if score is not None:
valid_scores.append(score)
# Calculate final score
final_score = np.mean(valid_scores) if valid_scores else None
if final_score is not None:
final_score = float(np.clip(final_score, *self.config.score_range))
# Create thumbnail
thumbnail = image.copy()
thumbnail.thumbnail((200, 200), Image.Resampling.LANCZOS)
thumbnail_b64 = self._image_to_base64(thumbnail)
result = EvaluationResult(
file_name=Path(path).name,
file_path=path,
thumbnail_b64=thumbnail_b64,
model_scores=scores,
final_score=final_score
)
batch_results.append(result)
return batch_results
def _image_to_base64(self, image: Image.Image) -> str:
"""Convert PIL Image to base64 string."""
buffer = BytesIO()
image.save(buffer, format="JPEG", quality=85, optimize=True)
return base64.b64encode(buffer.getvalue()).decode('utf-8')
def get_available_models(self) -> Dict[ModelType, str]:
"""Get available models with their display names."""
return {
model_type: self.model_configs[model_type].display_name
for model_type in self.models.keys()
}
async def cleanup(self) -> None:
"""Clean up resources."""
# Shutdown worker
if self._worker_task:
await self._processing_queue.put(None)
await self._worker_task
# Clean up models
for model in self.models.values():
model.cleanup()
# Clean up temp directory
if self._temp_dir.exists():
shutil.rmtree(self._temp_dir)
logger.info("Model manager cleanup completed")
# =============================================================================
# Results Processing and Export
# =============================================================================
class ResultsProcessor:
"""Handle result processing, sorting, and export functionality."""
@staticmethod
def sort_results(results: List[EvaluationResult], sort_by: str, reverse: bool = True) -> List[EvaluationResult]:
"""Sort results by specified criteria."""
sort_key_map = {
"Final Score": lambda r: r.final_score if r.final_score is not None else -float('inf'),
"File Name": lambda r: r.file_name.lower(),
**{f"model_{model_type.value}": lambda r, mt=model_type.value: r.model_scores.get(mt) or -float('inf')
for model_type in ModelType}
}
sort_key = sort_key_map.get(sort_by, sort_key_map["Final Score"])
return sorted(results, key=sort_key, reverse=reverse and sort_by != "File Name")
@staticmethod
def generate_html_table(results: List[EvaluationResult], selected_models: List[ModelType]) -> str:
"""Generate HTML table for results display."""
if not results:
return "<p>No results to display</p>"
# CSS styles
styles = """
<style>
.results-table {
width: 100%; border-collapse: collapse; margin: 20px 0;
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
}
.results-table th, .results-table td {
border: 1px solid #ddd; padding: 12px; text-align: center;
}
.results-table th {
background-color: #f8f9fa; font-weight: 600; color: #495057;
}
.results-table tr:nth-child(even) { background-color: #f8f9fa; }
.results-table tr:hover { background-color: #e9ecef; }
.image-preview {
max-width: 120px; max-height: 120px; border-radius: 8px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.score-excellent { color: #28a745; font-weight: bold; }
.score-good { color: #ffc107; font-weight: bold; }
.score-poor { color: #dc3545; font-weight: bold; }
.score-na { color: #6c757d; font-style: italic; }
</style>
"""
# Table header
html = styles + '<table class="results-table"><thead><tr>'
html += '<th>Image</th><th>File Name</th>'
for model_type in selected_models:
model_name = ModelType(model_type).name.replace('_', ' ').title()
html += f'<th>{model_name}</th>'
html += '<th>Final Score</th></tr></thead><tbody>'
# Table rows
for result in results:
html += '<tr>'
html += f'<td><img src="data:image/jpeg;base64,{result.thumbnail_b64}" class="image-preview" alt="{result.file_name}"></td>'
html += f'<td>{result.file_name}</td>'
# Model scores
for model_type in selected_models:
score = result.model_scores.get(model_type.value)
html += ResultsProcessor._format_score_cell(score)
# Final score
html += ResultsProcessor._format_score_cell(result.final_score)
html += '</tr>'
html += '</tbody></table>'
return html