|
import os |
|
import gc |
|
from abc import ABC, abstractmethod |
|
from pathlib import Path |
|
from typing import List, Dict, Any, Type |
|
|
|
import cv2 |
|
import gradio as gr |
|
import numpy as np |
|
import pandas as pd |
|
import torch |
|
import onnxruntime as rt |
|
from PIL import Image |
|
from huggingface_hub import hf_hub_download |
|
from transformers import pipeline, Pipeline, AutoModel, AutoProcessor |
|
from tqdm import tqdm |
|
|
|
|
|
Image.MAX_IMAGE_PIXELS = None |
|
|
|
|
|
CACHE_DIR = "./hf_cache" |
|
DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
|
DTYPE = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8 else torch.float32 |
|
|
|
print(f"Using device: {DEVICE} with dtype: {DTYPE}") |
|
|
|
|
|
|
|
|
|
|
|
class AestheticScorer(ABC): |
|
"""Abstract base class for all aesthetic scoring models.""" |
|
def __init__(self, model_name: str, repo_id: str, filename: str = None): |
|
self.model_name = model_name |
|
self.repo_id = repo_id |
|
self.filename = filename |
|
self._model = None |
|
print(f"Initializing scorer definition: {self.model_name}") |
|
|
|
@property |
|
def model(self): |
|
"""Lazy-loads the model on first access.""" |
|
if self._model is None: |
|
print(f"Loading model weights for '{self.model_name}'...") |
|
self._model = self.load_model() |
|
print(f"'{self.model_name}' model weights loaded.") |
|
return self._model |
|
|
|
def _download_model(self) -> str: |
|
"""Downloads the model file from Hugging Face Hub.""" |
|
return hf_hub_download(repo_id=self.repo_id, filename=self.filename, cache_dir=CACHE_DIR) |
|
|
|
@abstractmethod |
|
def load_model(self) -> Any: |
|
"""Loads the model and any necessary preprocessors.""" |
|
pass |
|
|
|
@abstractmethod |
|
def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
|
"""Scores a batch of images and returns a list of floats.""" |
|
pass |
|
|
|
def release_model(self): |
|
"""Releases model from memory to conserve VRAM/RAM.""" |
|
if self._model is not None: |
|
print(f"Releasing model from memory: {self.model_name}") |
|
del self._model |
|
self._model = None |
|
gc.collect() |
|
if torch.cuda.is_available(): |
|
torch.cuda.empty_cache() |
|
|
|
class PipelineScorer(AestheticScorer): |
|
"""Scorer for models compatible with Hugging Face pipelines.""" |
|
def load_model(self) -> Pipeline: |
|
return pipeline("image-classification", model=self.repo_id, device=DEVICE) |
|
|
|
@torch.no_grad() |
|
def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
|
results = self.model(image_batch, top_k=None) |
|
scores = [] |
|
for res in results: |
|
try: |
|
hq_score = next(item['score'] for item in res if item['label'] == 'hq') |
|
scores.append(round(hq_score * 10.0, 4)) |
|
except (StopIteration, TypeError): |
|
scores.append(0.0) |
|
return scores |
|
|
|
class ONNXScorer(AestheticScorer): |
|
"""Scorer for ONNX-based models.""" |
|
def load_model(self) -> rt.InferenceSession: |
|
model_path = self._download_model() |
|
return rt.InferenceSession(model_path, providers=['CUDAExecutionProvider' if DEVICE == 'cuda' else 'CPUExecutionProvider']) |
|
|
|
def _preprocess(self, img: Image.Image) -> np.ndarray: |
|
img_np = np.array(img.convert("RGB")).astype(np.float32) / 255.0 |
|
s = 768 |
|
h, w = img_np.shape[:2] |
|
ratio = s / max(h, w) |
|
new_h, new_w = int(h * ratio), int(w * ratio) |
|
|
|
resized = cv2.resize(img_np, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
canvas = np.zeros((s, s, 3), dtype=np.float32) |
|
pad_h, pad_w = (s - new_h) // 2, (s - new_w) // 2 |
|
canvas[pad_h:pad_h + new_h, pad_w:pad_w + new_w] = resized |
|
|
|
return np.transpose(canvas, (2, 0, 1))[np.newaxis, :] |
|
|
|
def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
|
scores = [] |
|
for img in image_batch: |
|
try: |
|
input_tensor = self._preprocess(img) |
|
pred = self.model.run(None, {"img": input_tensor})[0].item() |
|
scores.append(round(pred * 10.0, 4)) |
|
except Exception: |
|
scores.append(0.0) |
|
return scores |
|
|
|
class CLIPMLPScorer(AestheticScorer): |
|
"""Scorer for models using a CLIP backbone and a custom MLP head.""" |
|
class MLP(torch.nn.Module): |
|
"""Re-implementation of the exact MLP from the original code.""" |
|
def __init__(self, input_size: int): |
|
super().__init__() |
|
self.layers = torch.nn.Sequential( |
|
torch.nn.Linear(input_size, 2048), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(2048), |
|
torch.nn.Dropout(0.3), |
|
torch.nn.Linear(2048, 512), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(512), |
|
torch.nn.Dropout(0.3), |
|
torch.nn.Linear(512, 256), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(256), |
|
torch.nn.Dropout(0.2), |
|
torch.nn.Linear(256, 128), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(128), |
|
torch.nn.Dropout(0.1), |
|
torch.nn.Linear(128, 32), |
|
torch.nn.ReLU(), |
|
torch.nn.Linear(32, 1) |
|
) |
|
def forward(self, x): |
|
return self.layers(x) |
|
|
|
def load_model(self) -> Dict[str, Any]: |
|
import clip |
|
model_path = self._download_model() |
|
mlp = self.MLP(input_size=768) |
|
state_dict = torch.load(model_path, map_location=DEVICE) |
|
mlp.load_state_dict(state_dict) |
|
mlp.to(device=DEVICE) |
|
mlp.eval() |
|
clip_model, preprocess = clip.load("ViT-L/14", device=DEVICE) |
|
return {"mlp": mlp, "clip": clip_model, "preprocess": preprocess} |
|
|
|
@torch.no_grad() |
|
def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
|
preprocess = self.model['preprocess'] |
|
|
|
if len(image_batch) == 1: |
|
image_batch = image_batch * 2 |
|
single_image_mode = True |
|
else: |
|
single_image_mode = False |
|
|
|
image_tensors = torch.cat([preprocess(img).unsqueeze(0) for img in image_batch]).to(DEVICE) |
|
image_features = self.model['clip'].encode_image(image_tensors).to(torch.float32) |
|
image_features /= image_features.norm(dim=-1, keepdim=True) |
|
predictions = self.model['mlp'](image_features).squeeze(-1) |
|
scores = predictions.clamp(0, 10).float().cpu().numpy() |
|
|
|
final_scores = [round(float(s), 4) for s in scores] |
|
return final_scores[:1] if single_image_mode else final_scores |
|
|
|
class SigLIPScorer(AestheticScorer): |
|
"""Scorer for the Aesthetic Predictor V2.5 SigLIP model.""" |
|
def load_model(self) -> Dict[str, Any]: |
|
model = AutoModel.from_pretrained(self.repo_id, trust_remote_code=True).to(DEVICE, DTYPE).eval() |
|
processor = AutoProcessor.from_pretrained(self.repo_id, trust_remote_code=True) |
|
return {"model": model, "processor": processor} |
|
|
|
@torch.no_grad() |
|
def score_batch(self, image_batch: List[Image.Image]) -> List[float]: |
|
inputs = self.model['processor']( |
|
images=[img.convert("RGB") for img in image_batch], |
|
return_tensors="pt" |
|
) |
|
inputs = {k: v.to(DEVICE) for k, v in inputs.items()} |
|
inputs['pixel_values'] = inputs['pixel_values'].to(DTYPE) |
|
logits = self.model(**inputs).logits.squeeze(-1) |
|
scores = logits.float().cpu().numpy() |
|
return [round(float(s), 4) for s in scores] |
|
|
|
|
|
MODEL_REGISTRY: Dict[str, AestheticScorer] = { |
|
"Aesthetic Shadow V2": PipelineScorer("Aesthetic Shadow V2", "NeoChen1024/aesthetic-shadow-v2-backup"), |
|
"Waifu Scorer V3": CLIPMLPScorer("Waifu Scorer V3", "Eugeoter/waifu-scorer-v3", "model.pth"), |
|
"Aesthetic V2.5 SigLIP": SigLIPScorer("Aesthetic V2.5 SigLIP", "জিংוניत्र/Aesthetic-Predictor-V2-5-SigLIP"), |
|
"Anime Scorer": ONNXScorer("Anime Scorer", "skytnt/anime-aesthetic", "model.onnx") |
|
} |
|
_loaded_models_cache: Dict[str, AestheticScorer] = {} |
|
|
|
|
|
|
|
|
|
|
|
def get_scorers(model_names: List[str]) -> List[AestheticScorer]: |
|
"""Retrieves and caches scorer instances based on selected names.""" |
|
for name in list(_loaded_models_cache.keys()): |
|
if name not in model_names: |
|
_loaded_models_cache[name].release_model() |
|
del _loaded_models_cache[name] |
|
return [_loaded_models_cache.setdefault(name, MODEL_REGISTRY[name]) for name in model_names] |
|
|
|
def evaluate_images( |
|
files: List[gr.File], selected_model_names: List[str], batch_size: int, progress=gr.Progress(track_tqdm=True) |
|
) -> pd.DataFrame: |
|
"""Main function to process images and return results as a Pandas DataFrame.""" |
|
if not files: |
|
gr.Warning("No images uploaded. Please upload files to evaluate.") |
|
return pd.DataFrame() |
|
if not selected_model_names: |
|
gr.Warning("No models selected. Please select at least one model.") |
|
return pd.DataFrame() |
|
|
|
try: |
|
image_paths = [Path(f.name) for f in files] |
|
all_results, scorers = [], get_scorers(selected_model_names) |
|
|
|
for i in tqdm(range(0, len(image_paths), batch_size), desc="Processing Batches"): |
|
batch_paths = image_paths[i : i + batch_size] |
|
try: |
|
batch_images = [Image.open(p).convert("RGB") for p in batch_paths] |
|
except Exception as e: |
|
gr.Warning(f"Skipping a batch due to an error loading an image: {e}") |
|
continue |
|
|
|
batch_scores = {scorer.model_name: scorer.score_batch(batch_images) for scorer in scorers} |
|
|
|
for j, path in enumerate(batch_paths): |
|
result_row = {"Image": str(path), "Filename": path.name} |
|
scores_for_avg = [batch_scores[s.model_name][j] for s in scorers] |
|
for scorer in scorers: |
|
result_row[scorer.model_name] = batch_scores[scorer.model_name][j] |
|
result_row["Average Score"] = round(np.mean(scores_for_avg), 4) if scores_for_avg else 0.0 |
|
all_results.append(result_row) |
|
|
|
return pd.DataFrame(all_results) if all_results else pd.DataFrame() |
|
|
|
except Exception as e: |
|
gr.Error(f"A critical error occurred: {e}") |
|
return pd.DataFrame() |
|
|
|
|
|
|
|
|
|
|
|
def create_ui() -> gr.Blocks: |
|
"""Creates and configures the Gradio web interface.""" |
|
all_model_names = list(MODEL_REGISTRY.keys()) |
|
dataframe_headers = ["Image", "Filename"] + all_model_names + ["Average Score"] |
|
dataframe_datatypes = ["image", "str"] + ["number"] * (len(all_model_names) + 1) |
|
|
|
with gr.Blocks(theme=gr.themes.Soft(primary_hue="blue"), title="Image Aesthetic Scorer") as demo: |
|
gr.Markdown("# 🖼️ Modern Image Aesthetic Scorer") |
|
gr.Markdown("Upload images, select models, and click 'Evaluate'. Results table supports **interactive sorting** and **downloading as CSV**.") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
input_files = gr.Files(label="Upload Images", file_count="multiple", file_types=["image"]) |
|
model_checkboxes = gr.CheckboxGroup(choices=all_model_names, value=all_model_names, label="Scoring Models") |
|
batch_size_slider = gr.Slider(minimum=1, maximum=64, value=8, step=1, label="Batch Size", info="Adjust based on your VRAM.") |
|
with gr.Row(): |
|
process_button = gr.Button("🚀 Evaluate Images", variant="primary") |
|
clear_button = gr.Button("🧹 Clear All") |
|
|
|
with gr.Column(scale=3): |
|
|
|
results_dataframe = gr.DataFrame( |
|
headers=dataframe_headers, |
|
datatype=dataframe_datatypes, |
|
label="Evaluation Scores", |
|
interactive=True, |
|
height=800, |
|
show_download_button=True |
|
) |
|
|
|
process_button.click( |
|
fn=evaluate_images, |
|
inputs=[input_files, model_checkboxes, batch_size_slider], |
|
outputs=[results_dataframe] |
|
) |
|
|
|
def clear_outputs(): |
|
for scorer in list(_loaded_models_cache.values()): |
|
scorer.release_model() |
|
_loaded_models_cache.clear() |
|
gr.Info("Cleared results and released models from memory.") |
|
return pd.DataFrame(), None |
|
|
|
clear_button.click(fn=clear_outputs, outputs=[results_dataframe, input_files]) |
|
return demo |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
os.makedirs(CACHE_DIR, exist_ok=True) |
|
app = create_ui() |
|
app.queue().launch() |