|
import os |
|
import tempfile |
|
import base64 |
|
from io import BytesIO |
|
from typing import List, Dict, Any, Optional, Tuple |
|
from dataclasses import dataclass |
|
from pathlib import Path |
|
|
|
import cv2 |
|
import numpy as np |
|
import torch |
|
import onnxruntime as rt |
|
from PIL import Image |
|
import gradio as gr |
|
import pandas as pd |
|
from transformers import pipeline |
|
from huggingface_hub import hf_hub_download |
|
|
|
|
|
from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip |
|
|
|
|
|
@dataclass |
|
class EvaluationResult: |
|
"""Data class for storing image evaluation results.""" |
|
file_name: str |
|
image: Image.Image |
|
aesthetic_shadow: Optional[float] = None |
|
waifu_scorer: Optional[float] = None |
|
aesthetic_v2_5: Optional[float] = None |
|
anime_aesthetic: Optional[float] = None |
|
final_score: Optional[float] = None |
|
|
|
|
|
class MLP(torch.nn.Module): |
|
"""Optimized MLP for image feature regression.""" |
|
def __init__(self, input_size: int = 768): |
|
super().__init__() |
|
self.network = torch.nn.Sequential( |
|
torch.nn.Linear(input_size, 1024), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(1024), |
|
torch.nn.Dropout(0.2), |
|
torch.nn.Linear(1024, 256), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(256), |
|
torch.nn.Dropout(0.1), |
|
torch.nn.Linear(256, 64), |
|
torch.nn.ReLU(), |
|
torch.nn.Linear(64, 1) |
|
) |
|
|
|
def forward(self, x: torch.Tensor) -> torch.Tensor: |
|
return self.network(x) |
|
|
|
|
|
class ModelLoader: |
|
"""Centralized model loading and management.""" |
|
|
|
def __init__(self, device: str = None): |
|
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu') |
|
self.models = {} |
|
self._load_all_models() |
|
|
|
def _load_all_models(self): |
|
"""Load all models during initialization.""" |
|
try: |
|
self._load_aesthetic_shadow() |
|
self._load_waifu_scorer() |
|
self._load_aesthetic_v2_5() |
|
self._load_anime_aesthetic() |
|
print("β
All models loaded successfully!") |
|
except Exception as e: |
|
print(f"β Error loading models: {e}") |
|
|
|
def _load_aesthetic_shadow(self): |
|
"""Load Aesthetic Shadow model.""" |
|
print("π Loading Aesthetic Shadow...") |
|
self.models['aesthetic_shadow'] = pipeline( |
|
"image-classification", |
|
model="NeoChen1024/aesthetic-shadow-v2-backup", |
|
device=self.device |
|
) |
|
|
|
def _load_waifu_scorer(self): |
|
"""Load Waifu Scorer model.""" |
|
print("π Loading Waifu Scorer...") |
|
try: |
|
import clip |
|
|
|
|
|
model_path = hf_hub_download("Eugeoter/waifu-scorer-v3", "model.pth") |
|
mlp = MLP() |
|
state_dict = torch.load(model_path, map_location=self.device) |
|
mlp.load_state_dict(state_dict) |
|
mlp.to(self.device).eval() |
|
|
|
|
|
clip_model, preprocess = clip.load("ViT-L/14", device=self.device) |
|
|
|
self.models['waifu_scorer'] = { |
|
'mlp': mlp, |
|
'clip_model': clip_model, |
|
'preprocess': preprocess |
|
} |
|
except Exception as e: |
|
print(f"β οΈ Waifu Scorer not available: {e}") |
|
self.models['waifu_scorer'] = None |
|
|
|
def _load_aesthetic_v2_5(self): |
|
"""Load Aesthetic Predictor V2.5.""" |
|
print("π Loading Aesthetic V2.5...") |
|
try: |
|
model, preprocessor = convert_v2_5_from_siglip( |
|
low_cpu_mem_usage=True, |
|
trust_remote_code=True, |
|
) |
|
if torch.cuda.is_available(): |
|
model = model.to(torch.bfloat16).cuda() |
|
|
|
self.models['aesthetic_v2_5'] = { |
|
'model': model, |
|
'preprocessor': preprocessor |
|
} |
|
except Exception as e: |
|
print(f"β οΈ Aesthetic V2.5 not available: {e}") |
|
self.models['aesthetic_v2_5'] = None |
|
|
|
def _load_anime_aesthetic(self): |
|
"""Load Anime Aesthetic model.""" |
|
print("π Loading Anime Aesthetic...") |
|
try: |
|
model_path = hf_hub_download("skytnt/anime-aesthetic", "model.onnx") |
|
self.models['anime_aesthetic'] = rt.InferenceSession( |
|
model_path, |
|
providers=['CPUExecutionProvider'] |
|
) |
|
except Exception as e: |
|
print(f"β οΈ Anime Aesthetic not available: {e}") |
|
self.models['anime_aesthetic'] = None |
|
|
|
|
|
class ImageEvaluator: |
|
"""Main image evaluation class with batch processing.""" |
|
|
|
def __init__(self): |
|
self.loader = ModelLoader() |
|
self.temp_dir = Path(tempfile.mkdtemp()) |
|
|
|
def evaluate_images( |
|
self, |
|
images: List[Image.Image], |
|
file_names: List[str], |
|
selected_models: List[str], |
|
batch_size: int = 4, |
|
progress_callback=None |
|
) -> List[EvaluationResult]: |
|
"""Evaluate images using selected models.""" |
|
results = [] |
|
total_batches = (len(images) + batch_size - 1) // batch_size |
|
|
|
for batch_idx in range(0, len(images), batch_size): |
|
batch_images = images[batch_idx:batch_idx + batch_size] |
|
batch_names = file_names[batch_idx:batch_idx + batch_size] |
|
|
|
|
|
if progress_callback: |
|
progress = (batch_idx // batch_size + 1) / total_batches |
|
progress_callback(progress, f"Processing batch {batch_idx//batch_size + 1}/{total_batches}") |
|
|
|
|
|
batch_results = self._process_batch(batch_images, batch_names, selected_models) |
|
results.extend(batch_results) |
|
|
|
return results |
|
|
|
def _process_batch( |
|
self, |
|
images: List[Image.Image], |
|
file_names: List[str], |
|
selected_models: List[str] |
|
) -> List[EvaluationResult]: |
|
"""Process a single batch of images.""" |
|
batch_results = [] |
|
|
|
|
|
for i, (img, name) in enumerate(zip(images, file_names)): |
|
result = EvaluationResult(file_name=name, image=img) |
|
batch_results.append(result) |
|
|
|
|
|
if 'aesthetic_shadow' in selected_models: |
|
scores = self._eval_aesthetic_shadow(images) |
|
for result, score in zip(batch_results, scores): |
|
result.aesthetic_shadow = score |
|
|
|
if 'waifu_scorer' in selected_models: |
|
scores = self._eval_waifu_scorer(images) |
|
for result, score in zip(batch_results, scores): |
|
result.waifu_scorer = score |
|
|
|
if 'aesthetic_v2_5' in selected_models: |
|
scores = self._eval_aesthetic_v2_5(images) |
|
for result, score in zip(batch_results, scores): |
|
result.aesthetic_v2_5 = score |
|
|
|
if 'anime_aesthetic' in selected_models: |
|
scores = self._eval_anime_aesthetic(images) |
|
for result, score in zip(batch_results, scores): |
|
result.anime_aesthetic = score |
|
|
|
|
|
for result in batch_results: |
|
result.final_score = self._calculate_final_score(result, selected_models) |
|
|
|
return batch_results |
|
|
|
def _eval_aesthetic_shadow(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Evaluate using Aesthetic Shadow model.""" |
|
if not self.loader.models.get('aesthetic_shadow'): |
|
return [None] * len(images) |
|
|
|
try: |
|
results = self.loader.models['aesthetic_shadow'](images) |
|
scores = [] |
|
for result in results: |
|
try: |
|
hq_score = next(p for p in result if p['label'] == 'hq')['score'] |
|
scores.append(float(np.clip(hq_score * 10.0, 0.0, 10.0))) |
|
except: |
|
scores.append(None) |
|
return scores |
|
except Exception as e: |
|
print(f"Error in Aesthetic Shadow: {e}") |
|
return [None] * len(images) |
|
|
|
def _eval_waifu_scorer(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Evaluate using Waifu Scorer model.""" |
|
model_dict = self.loader.models.get('waifu_scorer') |
|
if not model_dict: |
|
return [None] * len(images) |
|
|
|
try: |
|
with torch.no_grad(): |
|
|
|
image_tensors = [model_dict['preprocess'](img).unsqueeze(0) for img in images] |
|
if len(image_tensors) == 1: |
|
image_tensors = image_tensors * 2 |
|
|
|
image_batch = torch.cat(image_tensors).to(self.loader.device) |
|
image_features = model_dict['clip_model'].encode_image(image_batch) |
|
|
|
|
|
norm = image_features.norm(2, dim=-1, keepdim=True) |
|
norm[norm == 0] = 1 |
|
im_emb = (image_features / norm).to(self.loader.device) |
|
|
|
predictions = model_dict['mlp'](im_emb) |
|
scores = predictions.clamp(0, 10).cpu().numpy().flatten().tolist() |
|
|
|
return scores[:len(images)] |
|
except Exception as e: |
|
print(f"Error in Waifu Scorer: {e}") |
|
return [None] * len(images) |
|
|
|
def _eval_aesthetic_v2_5(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Evaluate using Aesthetic Predictor V2.5.""" |
|
model_dict = self.loader.models.get('aesthetic_v2_5') |
|
if not model_dict: |
|
return [None] * len(images) |
|
|
|
try: |
|
rgb_images = [img.convert("RGB") for img in images] |
|
pixel_values = model_dict['preprocessor'](images=rgb_images, return_tensors="pt").pixel_values |
|
|
|
if torch.cuda.is_available(): |
|
pixel_values = pixel_values.to(torch.bfloat16).cuda() |
|
|
|
with torch.inference_mode(): |
|
scores = model_dict['model'](pixel_values).logits.squeeze().float().cpu().numpy() |
|
if scores.ndim == 0: |
|
scores = np.array([scores]) |
|
|
|
return [float(np.clip(s, 0.0, 10.0)) for s in scores.tolist()] |
|
except Exception as e: |
|
print(f"Error in Aesthetic V2.5: {e}") |
|
return [None] * len(images) |
|
|
|
def _eval_anime_aesthetic(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Evaluate using Anime Aesthetic model.""" |
|
model = self.loader.models.get('anime_aesthetic') |
|
if not model: |
|
return [None] * len(images) |
|
|
|
scores = [] |
|
for img in images: |
|
try: |
|
|
|
img_np = np.array(img).astype(np.float32) / 255.0 |
|
h, w = img_np.shape[:2] |
|
s = 768 |
|
|
|
if h > w: |
|
new_h, new_w = s, int(s * w / h) |
|
else: |
|
new_h, new_w = int(s * h / w), s |
|
|
|
resized = cv2.resize(img_np, (new_w, new_h)) |
|
canvas = np.zeros((s, s, 3), dtype=np.float32) |
|
|
|
pad_h = (s - new_h) // 2 |
|
pad_w = (s - new_w) // 2 |
|
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized |
|
|
|
input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :] |
|
pred = model.run(None, {"img": input_tensor})[0].item() |
|
scores.append(float(np.clip(pred * 10.0, 0.0, 10.0))) |
|
except Exception as e: |
|
print(f"Error processing image: {e}") |
|
scores.append(None) |
|
|
|
return scores |
|
|
|
def _calculate_final_score(self, result: EvaluationResult, selected_models: List[str]) -> Optional[float]: |
|
"""Calculate final score from selected model results.""" |
|
scores = [] |
|
|
|
for model in selected_models: |
|
score = getattr(result, model, None) |
|
if score is not None: |
|
scores.append(score) |
|
|
|
return float(np.mean(scores)) if scores else None |
|
|
|
def results_to_dataframe(self, results: List[EvaluationResult]) -> pd.DataFrame: |
|
"""Convert results to pandas DataFrame.""" |
|
data = [] |
|
for result in results: |
|
row = { |
|
'File Name': result.file_name, |
|
'Final Score': result.final_score, |
|
} |
|
if result.aesthetic_shadow is not None: |
|
row['Aesthetic Shadow'] = result.aesthetic_shadow |
|
if result.waifu_scorer is not None: |
|
row['Waifu Scorer'] = result.waifu_scorer |
|
if result.aesthetic_v2_5 is not None: |
|
row['Aesthetic V2.5'] = result.aesthetic_v2_5 |
|
if result.anime_aesthetic is not None: |
|
row['Anime Aesthetic'] = result.anime_aesthetic |
|
data.append(row) |
|
|
|
return pd.DataFrame(data) |
|
|
|
def optimize_batch_size(self, sample_images: List[Image.Image]) -> int: |
|
"""Automatically determine optimal batch size.""" |
|
if not sample_images: |
|
return 1 |
|
|
|
test_image = sample_images[0] |
|
batch_size = 1 |
|
max_test = min(16, len(sample_images)) |
|
|
|
while batch_size <= max_test: |
|
try: |
|
test_batch = [test_image] * batch_size |
|
|
|
if self.loader.models.get('aesthetic_shadow'): |
|
_ = self.loader.models['aesthetic_shadow'](test_batch) |
|
batch_size *= 2 |
|
except Exception: |
|
break |
|
|
|
optimal = max(1, batch_size // 2) |
|
return min(optimal, 8) |
|
|
|
|
|
def create_interface(): |
|
"""Create the Gradio interface.""" |
|
evaluator = ImageEvaluator() |
|
|
|
|
|
model_choices = [ |
|
("Aesthetic Shadow", "aesthetic_shadow"), |
|
("Waifu Scorer", "waifu_scorer"), |
|
("Aesthetic V2.5", "aesthetic_v2_5"), |
|
("Anime Aesthetic", "anime_aesthetic") |
|
] |
|
available_models = [choice[1] for choice in model_choices] |
|
|
|
with gr.Blocks(title="Image Evaluation Tool", theme=gr.themes.Soft()) as app: |
|
gr.Markdown(""" |
|
# π¨ Modern Image Evaluation Tool |
|
|
|
Upload images to evaluate them using state-of-the-art aesthetic and quality prediction models. |
|
|
|
**Features:** |
|
- Multiple AI models for comprehensive evaluation |
|
- Batch processing with automatic optimization |
|
- Interactive results table with sorting and filtering |
|
- CSV export functionality |
|
- Real-time progress tracking |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
|
|
input_files = gr.File( |
|
label="π Upload Images", |
|
file_count="multiple", |
|
file_types=["image"] |
|
) |
|
|
|
model_selection = gr.CheckboxGroup( |
|
choices=model_choices, |
|
value=available_models, |
|
label="π€ Select Models", |
|
info="Choose which models to use for evaluation" |
|
) |
|
|
|
with gr.Row(): |
|
auto_batch = gr.Checkbox( |
|
label="π Auto Batch Size", |
|
value=True, |
|
info="Automatically optimize batch size" |
|
) |
|
|
|
manual_batch = gr.Slider( |
|
minimum=1, |
|
maximum=16, |
|
value=4, |
|
step=1, |
|
label="π Batch Size", |
|
interactive=False, |
|
info="Manual batch size (when auto is disabled)" |
|
) |
|
|
|
evaluate_btn = gr.Button( |
|
"π Evaluate Images", |
|
variant="primary", |
|
size="lg" |
|
) |
|
|
|
clear_btn = gr.Button("ποΈ Clear Results", variant="secondary") |
|
|
|
with gr.Column(scale=2): |
|
|
|
progress_bar = gr.Progress() |
|
status_text = gr.Textbox( |
|
label="π Status", |
|
interactive=False, |
|
max_lines=2 |
|
) |
|
|
|
|
|
results_table = gr.DataFrame( |
|
label="π Evaluation Results", |
|
interactive=False, |
|
wrap=True, |
|
max_height=400 |
|
) |
|
|
|
|
|
with gr.Row(): |
|
export_csv = gr.Button("π₯ Export CSV", variant="secondary") |
|
download_file = gr.File( |
|
label="πΎ Download", |
|
visible=False |
|
) |
|
|
|
|
|
results_state = gr.State([]) |
|
|
|
|
|
def toggle_batch_slider(auto_enabled): |
|
return gr.update(interactive=not auto_enabled) |
|
|
|
def process_images(files, models, auto_batch_enabled, manual_batch_size, progress=gr.Progress()): |
|
if not files or not models: |
|
return "β Please upload images and select at least one model", pd.DataFrame(), [] |
|
|
|
try: |
|
|
|
images = [] |
|
file_names = [] |
|
|
|
progress(0.1, "π Loading images...") |
|
|
|
for file in files: |
|
try: |
|
img = Image.open(file.name).convert("RGB") |
|
images.append(img) |
|
file_names.append(os.path.basename(file.name)) |
|
except Exception as e: |
|
print(f"Error loading {file.name}: {e}") |
|
|
|
if not images: |
|
return "β No valid images loaded", pd.DataFrame(), [] |
|
|
|
|
|
if auto_batch_enabled: |
|
batch_size = evaluator.optimize_batch_size(images[:2]) |
|
progress(0.2, f"π§ Optimized batch size: {batch_size}") |
|
else: |
|
batch_size = int(manual_batch_size) |
|
|
|
|
|
def progress_callback(prog, msg): |
|
progress(0.2 + prog * 0.7, msg) |
|
|
|
results = evaluator.evaluate_images( |
|
images, file_names, models, batch_size, progress_callback |
|
) |
|
|
|
progress(0.95, "π Generating results table...") |
|
|
|
|
|
df = evaluator.results_to_dataframe(results) |
|
df = df.sort_values('Final Score', ascending=False, na_position='last') |
|
|
|
progress(1.0, f"β
Processed {len(results)} images successfully!") |
|
|
|
return f"β
Evaluated {len(results)} images using {len(models)} models", df, results |
|
|
|
except Exception as e: |
|
return f"β Error during processing: {str(e)}", pd.DataFrame(), [] |
|
|
|
def update_results_table(models, current_results): |
|
if not current_results: |
|
return pd.DataFrame() |
|
|
|
|
|
for result in current_results: |
|
result.final_score = evaluator._calculate_final_score(result, models) |
|
|
|
df = evaluator.results_to_dataframe(current_results) |
|
return df.sort_values('Final Score', ascending=False, na_position='last') |
|
|
|
def export_results(current_results): |
|
if not current_results: |
|
return gr.update(visible=False) |
|
|
|
df = evaluator.results_to_dataframe(current_results) |
|
csv_path = evaluator.temp_dir / "evaluation_results.csv" |
|
df.to_csv(csv_path, index=False) |
|
|
|
return gr.update(value=str(csv_path), visible=True) |
|
|
|
def clear_all(): |
|
return ( |
|
"π Ready for new evaluation", |
|
pd.DataFrame(), |
|
[], |
|
gr.update(visible=False) |
|
) |
|
|
|
|
|
auto_batch.change( |
|
toggle_batch_slider, |
|
inputs=[auto_batch], |
|
outputs=[manual_batch] |
|
) |
|
|
|
evaluate_btn.click( |
|
process_images, |
|
inputs=[input_files, model_selection, auto_batch, manual_batch], |
|
outputs=[status_text, results_table, results_state] |
|
) |
|
|
|
model_selection.change( |
|
update_results_table, |
|
inputs=[model_selection, results_state], |
|
outputs=[results_table] |
|
) |
|
|
|
export_csv.click( |
|
export_results, |
|
inputs=[results_state], |
|
outputs=[download_file] |
|
) |
|
|
|
clear_btn.click( |
|
clear_all, |
|
outputs=[status_text, results_table, results_state, download_file] |
|
) |
|
|
|
|
|
app.load(lambda: "π Ready for evaluation - Upload images to get started!") |
|
|
|
return app |
|
|
|
|
|
if __name__ == "__main__": |
|
app = create_interface() |
|
app.queue(max_size=10).launch( |
|
server_name="0.0.0.0", |
|
server_port=7860, |
|
share=False, |
|
show_error=True |
|
) |