CIET / app.py
VOIDER's picture
Update app.py
8b461d6 verified
raw
history blame
22.3 kB
import os
import tempfile
import base64
from io import BytesIO
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass
from pathlib import Path
import cv2
import numpy as np
import torch
import onnxruntime as rt
from PIL import Image
import gradio as gr
import pandas as pd
from transformers import pipeline
from huggingface_hub import hf_hub_download
# Import necessary function from aesthetic_predictor_v2_5
from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip
@dataclass
class EvaluationResult:
"""Data class for storing image evaluation results."""
file_name: str
image: Image.Image
aesthetic_shadow: Optional[float] = None
waifu_scorer: Optional[float] = None
aesthetic_v2_5: Optional[float] = None
anime_aesthetic: Optional[float] = None
final_score: Optional[float] = None
class MLP(torch.nn.Module):
"""Optimized MLP for image feature regression."""
def __init__(self, input_size: int = 768):
super().__init__()
self.network = torch.nn.Sequential(
torch.nn.Linear(input_size, 1024),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(1024),
torch.nn.Dropout(0.2),
torch.nn.Linear(1024, 256),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(256),
torch.nn.Dropout(0.1),
torch.nn.Linear(256, 64),
torch.nn.ReLU(),
torch.nn.Linear(64, 1)
)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.network(x)
class ModelLoader:
"""Centralized model loading and management."""
def __init__(self, device: str = None):
self.device = device or ('cuda' if torch.cuda.is_available() else 'cpu')
self.models = {}
self._load_all_models()
def _load_all_models(self):
"""Load all models during initialization."""
try:
self._load_aesthetic_shadow()
self._load_waifu_scorer()
self._load_aesthetic_v2_5()
self._load_anime_aesthetic()
print("βœ… All models loaded successfully!")
except Exception as e:
print(f"❌ Error loading models: {e}")
def _load_aesthetic_shadow(self):
"""Load Aesthetic Shadow model."""
print("πŸ”„ Loading Aesthetic Shadow...")
self.models['aesthetic_shadow'] = pipeline(
"image-classification",
model="NeoChen1024/aesthetic-shadow-v2-backup",
device=self.device
)
def _load_waifu_scorer(self):
"""Load Waifu Scorer model."""
print("πŸ”„ Loading Waifu Scorer...")
try:
import clip
# Load MLP
model_path = hf_hub_download("Eugeoter/waifu-scorer-v3", "model.pth")
mlp = MLP()
state_dict = torch.load(model_path, map_location=self.device)
mlp.load_state_dict(state_dict)
mlp.to(self.device).eval()
# Load CLIP
clip_model, preprocess = clip.load("ViT-L/14", device=self.device)
self.models['waifu_scorer'] = {
'mlp': mlp,
'clip_model': clip_model,
'preprocess': preprocess
}
except Exception as e:
print(f"⚠️ Waifu Scorer not available: {e}")
self.models['waifu_scorer'] = None
def _load_aesthetic_v2_5(self):
"""Load Aesthetic Predictor V2.5."""
print("πŸ”„ Loading Aesthetic V2.5...")
try:
model, preprocessor = convert_v2_5_from_siglip(
low_cpu_mem_usage=True,
trust_remote_code=True,
)
if torch.cuda.is_available():
model = model.to(torch.bfloat16).cuda()
self.models['aesthetic_v2_5'] = {
'model': model,
'preprocessor': preprocessor
}
except Exception as e:
print(f"⚠️ Aesthetic V2.5 not available: {e}")
self.models['aesthetic_v2_5'] = None
def _load_anime_aesthetic(self):
"""Load Anime Aesthetic model."""
print("πŸ”„ Loading Anime Aesthetic...")
try:
model_path = hf_hub_download("skytnt/anime-aesthetic", "model.onnx")
self.models['anime_aesthetic'] = rt.InferenceSession(
model_path,
providers=['CPUExecutionProvider']
)
except Exception as e:
print(f"⚠️ Anime Aesthetic not available: {e}")
self.models['anime_aesthetic'] = None
class ImageEvaluator:
"""Main image evaluation class with batch processing."""
def __init__(self):
self.loader = ModelLoader()
self.temp_dir = Path(tempfile.mkdtemp())
def evaluate_images(
self,
images: List[Image.Image],
file_names: List[str],
selected_models: List[str],
batch_size: int = 4,
progress_callback=None
) -> List[EvaluationResult]:
"""Evaluate images using selected models."""
results = []
total_batches = (len(images) + batch_size - 1) // batch_size
for batch_idx in range(0, len(images), batch_size):
batch_images = images[batch_idx:batch_idx + batch_size]
batch_names = file_names[batch_idx:batch_idx + batch_size]
# Update progress
if progress_callback:
progress = (batch_idx // batch_size + 1) / total_batches
progress_callback(progress, f"Processing batch {batch_idx//batch_size + 1}/{total_batches}")
# Process batch
batch_results = self._process_batch(batch_images, batch_names, selected_models)
results.extend(batch_results)
return results
def _process_batch(
self,
images: List[Image.Image],
file_names: List[str],
selected_models: List[str]
) -> List[EvaluationResult]:
"""Process a single batch of images."""
batch_results = []
# Initialize results
for i, (img, name) in enumerate(zip(images, file_names)):
result = EvaluationResult(file_name=name, image=img)
batch_results.append(result)
# Process each selected model
if 'aesthetic_shadow' in selected_models:
scores = self._eval_aesthetic_shadow(images)
for result, score in zip(batch_results, scores):
result.aesthetic_shadow = score
if 'waifu_scorer' in selected_models:
scores = self._eval_waifu_scorer(images)
for result, score in zip(batch_results, scores):
result.waifu_scorer = score
if 'aesthetic_v2_5' in selected_models:
scores = self._eval_aesthetic_v2_5(images)
for result, score in zip(batch_results, scores):
result.aesthetic_v2_5 = score
if 'anime_aesthetic' in selected_models:
scores = self._eval_anime_aesthetic(images)
for result, score in zip(batch_results, scores):
result.anime_aesthetic = score
# Calculate final scores
for result in batch_results:
result.final_score = self._calculate_final_score(result, selected_models)
return batch_results
def _eval_aesthetic_shadow(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Evaluate using Aesthetic Shadow model."""
if not self.loader.models.get('aesthetic_shadow'):
return [None] * len(images)
try:
results = self.loader.models['aesthetic_shadow'](images)
scores = []
for result in results:
try:
hq_score = next(p for p in result if p['label'] == 'hq')['score']
scores.append(float(np.clip(hq_score * 10.0, 0.0, 10.0)))
except:
scores.append(None)
return scores
except Exception as e:
print(f"Error in Aesthetic Shadow: {e}")
return [None] * len(images)
def _eval_waifu_scorer(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Evaluate using Waifu Scorer model."""
model_dict = self.loader.models.get('waifu_scorer')
if not model_dict:
return [None] * len(images)
try:
with torch.no_grad():
# Preprocess images
image_tensors = [model_dict['preprocess'](img).unsqueeze(0) for img in images]
if len(image_tensors) == 1:
image_tensors = image_tensors * 2 # CLIP requirement
image_batch = torch.cat(image_tensors).to(self.loader.device)
image_features = model_dict['clip_model'].encode_image(image_batch)
# Normalize features
norm = image_features.norm(2, dim=-1, keepdim=True)
norm[norm == 0] = 1
im_emb = (image_features / norm).to(self.loader.device)
predictions = model_dict['mlp'](im_emb)
scores = predictions.clamp(0, 10).cpu().numpy().flatten().tolist()
return scores[:len(images)]
except Exception as e:
print(f"Error in Waifu Scorer: {e}")
return [None] * len(images)
def _eval_aesthetic_v2_5(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Evaluate using Aesthetic Predictor V2.5."""
model_dict = self.loader.models.get('aesthetic_v2_5')
if not model_dict:
return [None] * len(images)
try:
rgb_images = [img.convert("RGB") for img in images]
pixel_values = model_dict['preprocessor'](images=rgb_images, return_tensors="pt").pixel_values
if torch.cuda.is_available():
pixel_values = pixel_values.to(torch.bfloat16).cuda()
with torch.inference_mode():
scores = model_dict['model'](pixel_values).logits.squeeze().float().cpu().numpy()
if scores.ndim == 0:
scores = np.array([scores])
return [float(np.clip(s, 0.0, 10.0)) for s in scores.tolist()]
except Exception as e:
print(f"Error in Aesthetic V2.5: {e}")
return [None] * len(images)
def _eval_anime_aesthetic(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Evaluate using Anime Aesthetic model."""
model = self.loader.models.get('anime_aesthetic')
if not model:
return [None] * len(images)
scores = []
for img in images:
try:
# Preprocess image
img_np = np.array(img).astype(np.float32) / 255.0
h, w = img_np.shape[:2]
s = 768
if h > w:
new_h, new_w = s, int(s * w / h)
else:
new_h, new_w = int(s * h / w), s
resized = cv2.resize(img_np, (new_w, new_h))
canvas = np.zeros((s, s, 3), dtype=np.float32)
pad_h = (s - new_h) // 2
pad_w = (s - new_w) // 2
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w] = resized
input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :]
pred = model.run(None, {"img": input_tensor})[0].item()
scores.append(float(np.clip(pred * 10.0, 0.0, 10.0)))
except Exception as e:
print(f"Error processing image: {e}")
scores.append(None)
return scores
def _calculate_final_score(self, result: EvaluationResult, selected_models: List[str]) -> Optional[float]:
"""Calculate final score from selected model results."""
scores = []
for model in selected_models:
score = getattr(result, model, None)
if score is not None:
scores.append(score)
return float(np.mean(scores)) if scores else None
def results_to_dataframe(self, results: List[EvaluationResult]) -> pd.DataFrame:
"""Convert results to pandas DataFrame."""
data = []
for result in results:
row = {
'File Name': result.file_name,
'Final Score': result.final_score,
}
if result.aesthetic_shadow is not None:
row['Aesthetic Shadow'] = result.aesthetic_shadow
if result.waifu_scorer is not None:
row['Waifu Scorer'] = result.waifu_scorer
if result.aesthetic_v2_5 is not None:
row['Aesthetic V2.5'] = result.aesthetic_v2_5
if result.anime_aesthetic is not None:
row['Anime Aesthetic'] = result.anime_aesthetic
data.append(row)
return pd.DataFrame(data)
def optimize_batch_size(self, sample_images: List[Image.Image]) -> int:
"""Automatically determine optimal batch size."""
if not sample_images:
return 1
test_image = sample_images[0]
batch_size = 1
max_test = min(16, len(sample_images))
while batch_size <= max_test:
try:
test_batch = [test_image] * batch_size
# Test with a lightweight model
if self.loader.models.get('aesthetic_shadow'):
_ = self.loader.models['aesthetic_shadow'](test_batch)
batch_size *= 2
except Exception:
break
optimal = max(1, batch_size // 2)
return min(optimal, 8) # Cap at reasonable size
def create_interface():
"""Create the Gradio interface."""
evaluator = ImageEvaluator()
# Available models
model_choices = [
("Aesthetic Shadow", "aesthetic_shadow"),
("Waifu Scorer", "waifu_scorer"),
("Aesthetic V2.5", "aesthetic_v2_5"),
("Anime Aesthetic", "anime_aesthetic")
]
available_models = [choice[1] for choice in model_choices]
with gr.Blocks(title="Image Evaluation Tool", theme=gr.themes.Soft()) as app:
gr.Markdown("""
# 🎨 Modern Image Evaluation Tool
Upload images to evaluate them using state-of-the-art aesthetic and quality prediction models.
**Features:**
- Multiple AI models for comprehensive evaluation
- Batch processing with automatic optimization
- Interactive results table with sorting and filtering
- CSV export functionality
- Real-time progress tracking
""")
with gr.Row():
with gr.Column(scale=1):
# Input components
input_files = gr.File(
label="πŸ“ Upload Images",
file_count="multiple",
file_types=["image"]
)
model_selection = gr.CheckboxGroup(
choices=model_choices,
value=available_models,
label="πŸ€– Select Models",
info="Choose which models to use for evaluation"
)
with gr.Row():
auto_batch = gr.Checkbox(
label="πŸ”„ Auto Batch Size",
value=True,
info="Automatically optimize batch size"
)
manual_batch = gr.Slider(
minimum=1,
maximum=16,
value=4,
step=1,
label="πŸ“Š Batch Size",
interactive=False,
info="Manual batch size (when auto is disabled)"
)
evaluate_btn = gr.Button(
"πŸš€ Evaluate Images",
variant="primary",
size="lg"
)
clear_btn = gr.Button("πŸ—‘οΈ Clear Results", variant="secondary")
with gr.Column(scale=2):
# Progress and status
progress_bar = gr.Progress()
status_text = gr.Textbox(
label="πŸ“Š Status",
interactive=False,
max_lines=2
)
# Results display
results_table = gr.DataFrame(
label="πŸ“‹ Evaluation Results",
interactive=False,
wrap=True,
max_height=400
)
# Export functionality
with gr.Row():
export_csv = gr.Button("πŸ“₯ Export CSV", variant="secondary")
download_file = gr.File(
label="πŸ’Ύ Download",
visible=False
)
# State management
results_state = gr.State([])
# Event handlers
def toggle_batch_slider(auto_enabled):
return gr.update(interactive=not auto_enabled)
def process_images(files, models, auto_batch_enabled, manual_batch_size, progress=gr.Progress()):
if not files or not models:
return "❌ Please upload images and select at least one model", pd.DataFrame(), []
try:
# Load images
images = []
file_names = []
progress(0.1, "πŸ“‚ Loading images...")
for file in files:
try:
img = Image.open(file.name).convert("RGB")
images.append(img)
file_names.append(os.path.basename(file.name))
except Exception as e:
print(f"Error loading {file.name}: {e}")
if not images:
return "❌ No valid images loaded", pd.DataFrame(), []
# Determine batch size
if auto_batch_enabled:
batch_size = evaluator.optimize_batch_size(images[:2])
progress(0.2, f"πŸ”§ Optimized batch size: {batch_size}")
else:
batch_size = int(manual_batch_size)
# Process images
def progress_callback(prog, msg):
progress(0.2 + prog * 0.7, msg)
results = evaluator.evaluate_images(
images, file_names, models, batch_size, progress_callback
)
progress(0.95, "πŸ“Š Generating results table...")
# Convert to DataFrame
df = evaluator.results_to_dataframe(results)
df = df.sort_values('Final Score', ascending=False, na_position='last')
progress(1.0, f"βœ… Processed {len(results)} images successfully!")
return f"βœ… Evaluated {len(results)} images using {len(models)} models", df, results
except Exception as e:
return f"❌ Error during processing: {str(e)}", pd.DataFrame(), []
def update_results_table(models, current_results):
if not current_results:
return pd.DataFrame()
# Recalculate final scores based on selected models
for result in current_results:
result.final_score = evaluator._calculate_final_score(result, models)
df = evaluator.results_to_dataframe(current_results)
return df.sort_values('Final Score', ascending=False, na_position='last')
def export_results(current_results):
if not current_results:
return gr.update(visible=False)
df = evaluator.results_to_dataframe(current_results)
csv_path = evaluator.temp_dir / "evaluation_results.csv"
df.to_csv(csv_path, index=False)
return gr.update(value=str(csv_path), visible=True)
def clear_all():
return (
"πŸ”„ Ready for new evaluation",
pd.DataFrame(),
[],
gr.update(visible=False)
)
# Wire up events
auto_batch.change(
toggle_batch_slider,
inputs=[auto_batch],
outputs=[manual_batch]
)
evaluate_btn.click(
process_images,
inputs=[input_files, model_selection, auto_batch, manual_batch],
outputs=[status_text, results_table, results_state]
)
model_selection.change(
update_results_table,
inputs=[model_selection, results_state],
outputs=[results_table]
)
export_csv.click(
export_results,
inputs=[results_state],
outputs=[download_file]
)
clear_btn.click(
clear_all,
outputs=[status_text, results_table, results_state, download_file]
)
# Initial setup
app.load(lambda: "πŸ”„ Ready for evaluation - Upload images to get started!")
return app
if __name__ == "__main__":
app = create_interface()
app.queue(max_size=10).launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)