|
""" |
|
Modern Image Evaluation Tool with Aesthetic and Quality Prediction Models |
|
|
|
This refactored version features: |
|
- Modern async/await patterns with proper error handling |
|
- Type hints throughout for better code maintainability |
|
- Dependency injection and factory patterns |
|
- Proper resource management with context managers |
|
- Configuration-driven model loading |
|
- Improved batch processing with memory optimization |
|
- Clean separation of concerns with proper abstraction layers |
|
""" |
|
|
|
import asyncio |
|
import base64 |
|
import csv |
|
import logging |
|
import os |
|
import tempfile |
|
import shutil |
|
from contextlib import asynccontextmanager |
|
from dataclasses import dataclass, field |
|
from enum import Enum |
|
from io import BytesIO, StringIO |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Protocol, Tuple, Union, Any |
|
from abc import ABC, abstractmethod |
|
|
|
import cv2 |
|
import gradio as gr |
|
import numpy as np |
|
import onnxruntime as ort |
|
import torch |
|
import torch.nn as nn |
|
from PIL import Image |
|
from transformers import pipeline |
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModelType(Enum): |
|
"""Enumeration of available model types.""" |
|
AESTHETIC_SHADOW = "aesthetic_shadow" |
|
WAIFU_SCORER = "waifu_scorer" |
|
AESTHETIC_PREDICTOR_V2_5 = "aesthetic_predictor_v2_5" |
|
ANIME_AESTHETIC = "anime_aesthetic" |
|
|
|
|
|
@dataclass |
|
class ModelConfig: |
|
"""Configuration for individual models.""" |
|
name: str |
|
display_name: str |
|
enabled: bool = True |
|
batch_supported: bool = True |
|
model_path: Optional[str] = None |
|
cache_dir: Optional[str] = None |
|
|
|
|
|
@dataclass |
|
class ProcessingConfig: |
|
"""Configuration for processing parameters.""" |
|
auto_batch: bool = False |
|
manual_batch_size: int = 1 |
|
max_batch_size: int = 64 |
|
device: str = "cuda" if torch.cuda.is_available() else "cpu" |
|
score_range: Tuple[float, float] = (0.0, 10.0) |
|
|
|
|
|
@dataclass |
|
class EvaluationResult: |
|
"""Data class for individual evaluation results.""" |
|
file_name: str |
|
file_path: str |
|
thumbnail_b64: str |
|
model_scores: Dict[str, Optional[float]] = field(default_factory=dict) |
|
final_score: Optional[float] = None |
|
processing_time: float = 0.0 |
|
error: Optional[str] = None |
|
|
|
|
|
@dataclass |
|
class BatchResult: |
|
"""Data class for batch processing results.""" |
|
results: List[EvaluationResult] |
|
logs: List[str] |
|
processing_time: float |
|
batch_size_used: int |
|
success_count: int |
|
error_count: int |
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseModel(Protocol): |
|
"""Protocol defining the interface for all evaluation models.""" |
|
|
|
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Predict scores for a batch of images.""" |
|
... |
|
|
|
def is_available(self) -> bool: |
|
"""Check if the model is available and ready for inference.""" |
|
... |
|
|
|
def cleanup(self) -> None: |
|
"""Clean up model resources.""" |
|
... |
|
|
|
|
|
class ModernMLP(nn.Module): |
|
"""Modern implementation of MLP with improved architecture.""" |
|
|
|
def __init__( |
|
self, |
|
input_size: int, |
|
hidden_dims: List[int] = None, |
|
dropout_rates: List[float] = None, |
|
use_batch_norm: bool = True, |
|
activation: nn.Module = nn.ReLU |
|
): |
|
super().__init__() |
|
|
|
if hidden_dims is None: |
|
hidden_dims = [2048, 512, 256, 128, 32] |
|
if dropout_rates is None: |
|
dropout_rates = [0.3, 0.3, 0.2, 0.1, 0.0] |
|
|
|
layers = [] |
|
prev_dim = input_size |
|
|
|
for i, (hidden_dim, dropout_rate) in enumerate(zip(hidden_dims, dropout_rates)): |
|
layers.append(nn.Linear(prev_dim, hidden_dim)) |
|
layers.append(activation()) |
|
|
|
if use_batch_norm and i < len(hidden_dims) - 1: |
|
layers.append(nn.BatchNorm1d(hidden_dim)) |
|
|
|
if dropout_rate > 0: |
|
layers.append(nn.Dropout(dropout_rate)) |
|
|
|
prev_dim = hidden_dim |
|
|
|
|
|
layers.append(nn.Linear(prev_dim, 1)) |
|
self.network = nn.Sequential(*layers) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
return self.network(x) |
|
|
|
|
|
class WaifuScorerModel: |
|
"""Modernized WaifuScorer implementation with better error handling.""" |
|
|
|
def __init__(self, config: ModelConfig, device: str): |
|
self.config = config |
|
self.device = device |
|
self.dtype = torch.float32 |
|
self._available = False |
|
self._model = None |
|
self._clip_model = None |
|
self._preprocess = None |
|
|
|
self._initialize_model() |
|
|
|
def _initialize_model(self) -> None: |
|
"""Initialize the model with proper error handling.""" |
|
try: |
|
import clip |
|
|
|
|
|
model_path = self._get_model_path() |
|
|
|
|
|
self._model = ModernMLP(input_size=768) |
|
|
|
|
|
if model_path.endswith(".safetensors"): |
|
from safetensors.torch import load_file |
|
state_dict = load_file(model_path) |
|
else: |
|
state_dict = torch.load(model_path, map_location=self.device) |
|
|
|
self._model.load_state_dict(state_dict) |
|
self._model.to(self.device) |
|
self._model.eval() |
|
|
|
|
|
self._clip_model, self._preprocess = clip.load("ViT-L/14", device=self.device) |
|
self._available = True |
|
|
|
logger.info(f"WaifuScorer model loaded successfully on {self.device}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize WaifuScorer: {e}") |
|
self._available = False |
|
|
|
def _get_model_path(self) -> str: |
|
"""Get or download the model path.""" |
|
if self.config.model_path and os.path.isfile(self.config.model_path): |
|
return self.config.model_path |
|
|
|
|
|
model_path = "Eugeoter/waifu-scorer-v3/model.pth" |
|
username, repo_id, model_name = model_path.split("/")[-3:] |
|
return hf_hub_download(f"{username}/{repo_id}", model_name, cache_dir=self.config.cache_dir) |
|
|
|
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Predict scores for a batch of images.""" |
|
if not self._available: |
|
return [None] * len(images) |
|
|
|
try: |
|
|
|
batch_images = images * 2 if len(images) == 1 else images |
|
|
|
|
|
image_tensors = [self._preprocess(img).unsqueeze(0) for img in batch_images] |
|
image_batch = torch.cat(image_tensors).to(self.device) |
|
|
|
|
|
with torch.no_grad(): |
|
image_features = self._clip_model.encode_image(image_batch) |
|
|
|
norm = image_features.norm(2, dim=-1, keepdim=True) |
|
norm[norm == 0] = 1 |
|
normalized_features = (image_features / norm).to(device=self.device, dtype=self.dtype) |
|
|
|
predictions = self._model(normalized_features) |
|
scores = predictions.clamp(0, 10).cpu().numpy().reshape(-1).tolist() |
|
|
|
return scores[:len(images)] |
|
|
|
except Exception as e: |
|
logger.error(f"Error in WaifuScorer prediction: {e}") |
|
return [None] * len(images) |
|
|
|
def is_available(self) -> bool: |
|
return self._available |
|
|
|
def cleanup(self) -> None: |
|
"""Clean up model resources.""" |
|
if self._model is not None: |
|
del self._model |
|
if self._clip_model is not None: |
|
del self._clip_model |
|
torch.cuda.empty_cache() if torch.cuda.is_available() else None |
|
|
|
|
|
class AestheticShadowModel: |
|
"""Wrapper for Aesthetic Shadow model using transformers pipeline.""" |
|
|
|
def __init__(self, config: ModelConfig, device: str): |
|
self.config = config |
|
self.device = device |
|
self._available = False |
|
self._model = None |
|
|
|
self._initialize_model() |
|
|
|
def _initialize_model(self) -> None: |
|
"""Initialize the model pipeline.""" |
|
try: |
|
self._model = pipeline( |
|
"image-classification", |
|
model="NeoChen1024/aesthetic-shadow-v2-backup", |
|
device=self.device |
|
) |
|
self._available = True |
|
logger.info("Aesthetic Shadow model loaded successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize Aesthetic Shadow: {e}") |
|
self._available = False |
|
|
|
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Predict scores for a batch of images.""" |
|
if not self._available: |
|
return [None] * len(images) |
|
|
|
try: |
|
results = self._model(images) |
|
scores = [] |
|
|
|
for result in results: |
|
try: |
|
hq_score = next(p for p in result if p['label'] == 'hq')['score'] |
|
score = float(np.clip(hq_score * 10.0, 0.0, 10.0)) |
|
scores.append(score) |
|
except (StopIteration, KeyError, TypeError): |
|
scores.append(None) |
|
|
|
return scores |
|
|
|
except Exception as e: |
|
logger.error(f"Error in Aesthetic Shadow prediction: {e}") |
|
return [None] * len(images) |
|
|
|
def is_available(self) -> bool: |
|
return self._available |
|
|
|
def cleanup(self) -> None: |
|
if self._model is not None: |
|
del self._model |
|
|
|
|
|
class AestheticPredictorV25Model: |
|
"""Wrapper for Aesthetic Predictor V2.5 model.""" |
|
|
|
def __init__(self, config: ModelConfig, device: str): |
|
self.config = config |
|
self.device = device |
|
self._available = False |
|
self._model = None |
|
self._preprocessor = None |
|
|
|
self._initialize_model() |
|
|
|
def _initialize_model(self) -> None: |
|
"""Initialize the model.""" |
|
try: |
|
from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip |
|
|
|
self._model, self._preprocessor = convert_v2_5_from_siglip( |
|
low_cpu_mem_usage=True, |
|
trust_remote_code=True, |
|
) |
|
|
|
if torch.cuda.is_available(): |
|
self._model = self._model.to(torch.bfloat16).cuda() |
|
|
|
self._available = True |
|
logger.info("Aesthetic Predictor V2.5 loaded successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize Aesthetic Predictor V2.5: {e}") |
|
self._available = False |
|
|
|
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Predict scores for a batch of images.""" |
|
if not self._available: |
|
return [None] * len(images) |
|
|
|
try: |
|
rgb_images = [img.convert("RGB") for img in images] |
|
pixel_values = self._preprocessor(images=rgb_images, return_tensors="pt").pixel_values |
|
|
|
if torch.cuda.is_available(): |
|
pixel_values = pixel_values.to(torch.bfloat16).cuda() |
|
|
|
with torch.inference_mode(): |
|
scores = self._model(pixel_values).logits.squeeze().float().cpu().numpy() |
|
|
|
if scores.ndim == 0: |
|
scores = np.array([scores]) |
|
|
|
return [float(np.round(np.clip(s, 0.0, 10.0), 4)) for s in scores] |
|
|
|
except Exception as e: |
|
logger.error(f"Error in Aesthetic Predictor V2.5 prediction: {e}") |
|
return [None] * len(images) |
|
|
|
def is_available(self) -> bool: |
|
return self._available |
|
|
|
def cleanup(self) -> None: |
|
if self._model is not None: |
|
del self._model |
|
|
|
|
|
class AnimeAestheticModel: |
|
"""ONNX-based Anime Aesthetic model.""" |
|
|
|
def __init__(self, config: ModelConfig, device: str): |
|
self.config = config |
|
self.device = device |
|
self._available = False |
|
self._session = None |
|
|
|
self._initialize_model() |
|
|
|
def _initialize_model(self) -> None: |
|
"""Initialize the ONNX model.""" |
|
try: |
|
model_path = hf_hub_download(repo_id="skytnt/anime-aesthetic", filename="model.onnx") |
|
self._session = ort.InferenceSession(model_path, providers=['CPUExecutionProvider']) |
|
self._available = True |
|
logger.info("Anime Aesthetic model loaded successfully") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize Anime Aesthetic: {e}") |
|
self._available = False |
|
|
|
async def predict(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Predict scores for images (single image processing for ONNX).""" |
|
if not self._available: |
|
return [None] * len(images) |
|
|
|
scores = [] |
|
for img in images: |
|
try: |
|
score = self._predict_single(img) |
|
scores.append(float(np.clip(score * 10.0, 0.0, 10.0))) |
|
except Exception as e: |
|
logger.error(f"Error predicting anime aesthetic for image: {e}") |
|
scores.append(None) |
|
|
|
return scores |
|
|
|
def _predict_single(self, img: Image.Image) -> float: |
|
"""Predict score for a single image.""" |
|
img_np = np.array(img).astype(np.float32) / 255.0 |
|
s = 768 |
|
h, w = img_np.shape[:2] |
|
|
|
|
|
if h > w: |
|
new_h, new_w = s, int(s * w / h) |
|
else: |
|
new_h, new_w = int(s * h / w), s |
|
|
|
resized = cv2.resize(img_np, (new_w, new_h)) |
|
|
|
|
|
canvas = np.zeros((s, s, 3), dtype=np.float32) |
|
pad_h = (s - new_h) // 2 |
|
pad_w = (s - new_w) // 2 |
|
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized |
|
|
|
|
|
input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :] |
|
return self._session.run(None, {"img": input_tensor})[0].item() |
|
|
|
def is_available(self) -> bool: |
|
return self._available |
|
|
|
def cleanup(self) -> None: |
|
if self._session is not None: |
|
del self._session |
|
|
|
|
|
|
|
|
|
|
|
|
|
class ModelFactory: |
|
"""Factory for creating model instances.""" |
|
|
|
_MODEL_CLASSES = { |
|
ModelType.AESTHETIC_SHADOW: AestheticShadowModel, |
|
ModelType.WAIFU_SCORER: WaifuScorerModel, |
|
ModelType.AESTHETIC_PREDICTOR_V2_5: AestheticPredictorV25Model, |
|
ModelType.ANIME_AESTHETIC: AnimeAestheticModel, |
|
} |
|
|
|
@classmethod |
|
def create_model(cls, model_type: ModelType, config: ModelConfig, device: str) -> BaseModel: |
|
"""Create a model instance based on type.""" |
|
model_class = cls._MODEL_CLASSES.get(model_type) |
|
if not model_class: |
|
raise ValueError(f"Unknown model type: {model_type}") |
|
|
|
return model_class(config, device) |
|
|
|
|
|
class ModelManager: |
|
"""Advanced model manager with async processing and resource management.""" |
|
|
|
def __init__(self, processing_config: ProcessingConfig): |
|
self.config = processing_config |
|
self.models: Dict[ModelType, BaseModel] = {} |
|
self.model_configs = self._create_default_configs() |
|
self._processing_queue = asyncio.Queue() |
|
self._worker_task: Optional[asyncio.Task] = None |
|
self._temp_dir = Path(tempfile.mkdtemp()) |
|
|
|
self._initialize_models() |
|
|
|
def _create_default_configs(self) -> Dict[ModelType, ModelConfig]: |
|
"""Create default model configurations.""" |
|
return { |
|
ModelType.AESTHETIC_SHADOW: ModelConfig( |
|
name="aesthetic_shadow", |
|
display_name="Aesthetic Shadow" |
|
), |
|
ModelType.WAIFU_SCORER: ModelConfig( |
|
name="waifu_scorer", |
|
display_name="Waifu Scorer" |
|
), |
|
ModelType.AESTHETIC_PREDICTOR_V2_5: ModelConfig( |
|
name="aesthetic_predictor_v2_5", |
|
display_name="Aesthetic V2.5" |
|
), |
|
ModelType.ANIME_AESTHETIC: ModelConfig( |
|
name="anime_aesthetic", |
|
display_name="Anime Score", |
|
batch_supported=False |
|
), |
|
} |
|
|
|
def _initialize_models(self) -> None: |
|
"""Initialize all models.""" |
|
logger.info("Initializing models...") |
|
|
|
for model_type, config in self.model_configs.items(): |
|
if config.enabled: |
|
try: |
|
model = ModelFactory.create_model(model_type, config, self.config.device) |
|
if model.is_available(): |
|
self.models[model_type] = model |
|
logger.info(f"✓ {config.display_name} loaded successfully") |
|
else: |
|
logger.warning(f"✗ {config.display_name} failed to load") |
|
except Exception as e: |
|
logger.error(f"✗ {config.display_name} initialization error: {e}") |
|
|
|
logger.info(f"Initialized {len(self.models)} models successfully") |
|
|
|
async def start_worker(self) -> None: |
|
"""Start the background processing worker.""" |
|
if self._worker_task is None: |
|
self._worker_task = asyncio.create_task(self._worker_loop()) |
|
logger.info("Background worker started") |
|
|
|
async def _worker_loop(self) -> None: |
|
"""Main worker loop for processing requests.""" |
|
while True: |
|
request = await self._processing_queue.get() |
|
|
|
if request is None: |
|
break |
|
|
|
try: |
|
result = await self._process_request(request) |
|
request['future'].set_result(result) |
|
except Exception as e: |
|
request['future'].set_exception(e) |
|
finally: |
|
self._processing_queue.task_done() |
|
|
|
async def process_images( |
|
self, |
|
file_paths: List[str], |
|
selected_models: List[ModelType], |
|
auto_batch: bool = False, |
|
manual_batch_size: int = 1 |
|
) -> BatchResult: |
|
"""Process images with selected models.""" |
|
future = asyncio.Future() |
|
request = { |
|
'file_paths': file_paths, |
|
'selected_models': selected_models, |
|
'auto_batch': auto_batch, |
|
'manual_batch_size': manual_batch_size, |
|
'future': future |
|
} |
|
|
|
await self._processing_queue.put(request) |
|
return await future |
|
|
|
async def _process_request(self, request: Dict) -> BatchResult: |
|
"""Process a single batch request.""" |
|
start_time = asyncio.get_event_loop().time() |
|
logs = [] |
|
results = [] |
|
|
|
file_paths = request['file_paths'] |
|
selected_models = request['selected_models'] |
|
auto_batch = request['auto_batch'] |
|
manual_batch_size = request['manual_batch_size'] |
|
|
|
|
|
images, valid_paths = await self._load_images(file_paths, logs) |
|
|
|
if not images: |
|
return BatchResult([], logs, 0.0, 0, 0, len(file_paths)) |
|
|
|
|
|
batch_size = await self._determine_batch_size(images, auto_batch, manual_batch_size, logs) |
|
|
|
|
|
for i in range(0, len(images), batch_size): |
|
batch_images = images[i:i+batch_size] |
|
batch_paths = valid_paths[i:i+batch_size] |
|
|
|
batch_results = await self._process_batch(batch_images, batch_paths, selected_models, logs) |
|
results.extend(batch_results) |
|
|
|
processing_time = asyncio.get_event_loop().time() - start_time |
|
success_count = sum(1 for r in results if r.error is None) |
|
error_count = len(results) - success_count |
|
|
|
return BatchResult( |
|
results=results, |
|
logs=logs, |
|
processing_time=processing_time, |
|
batch_size_used=batch_size, |
|
success_count=success_count, |
|
error_count=error_count |
|
) |
|
|
|
async def _load_images(self, file_paths: List[str], logs: List[str]) -> Tuple[List[Image.Image], List[str]]: |
|
"""Load and validate images.""" |
|
images = [] |
|
valid_paths = [] |
|
|
|
logs.append(f"Loading {len(file_paths)} images...") |
|
|
|
for path in file_paths: |
|
try: |
|
img = Image.open(path).convert("RGB") |
|
images.append(img) |
|
valid_paths.append(path) |
|
except Exception as e: |
|
logs.append(f"Failed to load {path}: {e}") |
|
|
|
logs.append(f"Successfully loaded {len(images)} images") |
|
return images, valid_paths |
|
|
|
async def _determine_batch_size( |
|
self, |
|
images: List[Image.Image], |
|
auto_batch: bool, |
|
manual_batch_size: int, |
|
logs: List[str] |
|
) -> int: |
|
"""Determine optimal batch size.""" |
|
if not auto_batch: |
|
return min(manual_batch_size, len(images)) |
|
|
|
|
|
batch_size = 1 |
|
test_image = images[0:1] |
|
|
|
while batch_size <= min(len(images), self.config.max_batch_size): |
|
try: |
|
|
|
test_batch = test_image * batch_size |
|
for model_type, model in list(self.models.items())[:2]: |
|
await model.predict(test_batch) |
|
|
|
batch_size *= 2 |
|
except Exception: |
|
break |
|
|
|
optimal_batch = max(1, batch_size // 2) |
|
logs.append(f"Auto-tuned batch size: {optimal_batch}") |
|
return optimal_batch |
|
|
|
async def _process_batch( |
|
self, |
|
images: List[Image.Image], |
|
paths: List[str], |
|
selected_models: List[ModelType], |
|
logs: List[str] |
|
) -> List[EvaluationResult]: |
|
"""Process a single batch of images.""" |
|
batch_results = [] |
|
|
|
|
|
model_predictions = {} |
|
for model_type in selected_models: |
|
if model_type in self.models: |
|
try: |
|
predictions = await self.models[model_type].predict(images) |
|
model_predictions[model_type.value] = predictions |
|
logs.append(f"✓ {self.model_configs[model_type].display_name} processed batch") |
|
except Exception as e: |
|
logs.append(f"✗ {self.model_configs[model_type].display_name} error: {e}") |
|
model_predictions[model_type.value] = [None] * len(images) |
|
|
|
|
|
for i, (image, path) in enumerate(zip(images, paths)): |
|
|
|
scores = {} |
|
valid_scores = [] |
|
|
|
for model_type in selected_models: |
|
score = model_predictions.get(model_type.value, [None] * len(images))[i] |
|
scores[model_type.value] = score |
|
if score is not None: |
|
valid_scores.append(score) |
|
|
|
|
|
final_score = np.mean(valid_scores) if valid_scores else None |
|
if final_score is not None: |
|
final_score = float(np.clip(final_score, *self.config.score_range)) |
|
|
|
|
|
thumbnail = image.copy() |
|
thumbnail.thumbnail((200, 200), Image.Resampling.LANCZOS) |
|
thumbnail_b64 = self._image_to_base64(thumbnail) |
|
|
|
result = EvaluationResult( |
|
file_name=Path(path).name, |
|
file_path=path, |
|
thumbnail_b64=thumbnail_b64, |
|
model_scores=scores, |
|
final_score=final_score |
|
) |
|
|
|
batch_results.append(result) |
|
|
|
return batch_results |
|
|
|
def _image_to_base64(self, image: Image.Image) -> str: |
|
"""Convert PIL Image to base64 string.""" |
|
buffer = BytesIO() |
|
image.save(buffer, format="JPEG", quality=85, optimize=True) |
|
return base64.b64encode(buffer.getvalue()).decode('utf-8') |
|
|
|
def get_available_models(self) -> Dict[ModelType, str]: |
|
"""Get available models with their display names.""" |
|
return { |
|
model_type: self.model_configs[model_type].display_name |
|
for model_type in self.models.keys() |
|
} |
|
|
|
async def cleanup(self) -> None: |
|
"""Clean up resources.""" |
|
|
|
if self._worker_task: |
|
await self._processing_queue.put(None) |
|
await self._worker_task |
|
|
|
|
|
for model in self.models.values(): |
|
model.cleanup() |
|
|
|
|
|
if self._temp_dir.exists(): |
|
shutil.rmtree(self._temp_dir) |
|
|
|
logger.info("Model manager cleanup completed") |
|
|
|
|
|
|
|
|
|
|
|
|
|
class ResultsProcessor: |
|
"""Handle result processing, sorting, and export functionality.""" |
|
|
|
@staticmethod |
|
def sort_results(results: List[EvaluationResult], sort_by: str, reverse: bool = True) -> List[EvaluationResult]: |
|
"""Sort results by specified criteria.""" |
|
sort_key_map = { |
|
"Final Score": lambda r: r.final_score if r.final_score is not None else -float('inf'), |
|
"File Name": lambda r: r.file_name.lower(), |
|
**{f"model_{model_type.value}": lambda r, mt=model_type.value: r.model_scores.get(mt) or -float('inf') |
|
for model_type in ModelType} |
|
} |
|
|
|
sort_key = sort_key_map.get(sort_by, sort_key_map["Final Score"]) |
|
return sorted(results, key=sort_key, reverse=reverse and sort_by != "File Name") |
|
|
|
@staticmethod |
|
def generate_html_table(results: List[EvaluationResult], selected_models: List[ModelType]) -> str: |
|
"""Generate HTML table for results display.""" |
|
if not results: |
|
return "<p>No results to display</p>" |
|
|
|
|
|
styles = """ |
|
<style> |
|
.results-table { |
|
width: 100%; border-collapse: collapse; margin: 20px 0; |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; |
|
} |
|
.results-table th, .results-table td { |
|
border: 1px solid #ddd; padding: 12px; text-align: center; |
|
} |
|
.results-table th { |
|
background-color: #f8f9fa; font-weight: 600; color: #495057; |
|
} |
|
.results-table tr:nth-child(even) { background-color: #f8f9fa; } |
|
.results-table tr:hover { background-color: #e9ecef; } |
|
.image-preview { |
|
max-width: 120px; max-height: 120px; border-radius: 8px; |
|
box-shadow: 0 2px 4px rgba(0,0,0,0.1); |
|
} |
|
.score-excellent { color: #28a745; font-weight: bold; } |
|
.score-good { color: #ffc107; font-weight: bold; } |
|
.score-poor { color: #dc3545; font-weight: bold; } |
|
.score-na { color: #6c757d; font-style: italic; } |
|
</style> |
|
""" |
|
|
|
|
|
html = styles + '<table class="results-table"><thead><tr>' |
|
html += '<th>Image</th><th>File Name</th>' |
|
|
|
for model_type in selected_models: |
|
model_name = ModelType(model_type).name.replace('_', ' ').title() |
|
html += f'<th>{model_name}</th>' |
|
|
|
html += '<th>Final Score</th></tr></thead><tbody>' |
|
|
|
|
|
for result in results: |
|
html += '<tr>' |
|
html += f'<td><img src="data:image/jpeg;base64,{result.thumbnail_b64}" class="image-preview" alt="{result.file_name}"></td>' |
|
html += f'<td>{result.file_name}</td>' |
|
|
|
|
|
for model_type in selected_models: |
|
score = result.model_scores.get(model_type.value) |
|
html += ResultsProcessor._format_score_cell(score) |
|
|
|
|
|
html += ResultsProcessor._format_score_cell(result.final_score) |
|
html += '</tr>' |
|
|
|
html += '</tbody></table>' |
|
return html |