|
import os |
|
import asyncio |
|
from typing import List, Dict, Optional, Tuple, Any |
|
from dataclasses import dataclass, field |
|
from pathlib import Path |
|
import logging |
|
|
|
import cv2 |
|
import numpy as np |
|
import torch |
|
import onnxruntime as rt |
|
from PIL import Image |
|
import gradio as gr |
|
from transformers import pipeline |
|
from huggingface_hub import hf_hub_download |
|
import pandas as pd |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
def convert_v2_5_from_siglip(low_cpu_mem_usage=True, trust_remote_code=True): |
|
|
|
logger.warning("Using placeholder for convert_v2_5_from_siglip. Ensure the actual implementation is available.") |
|
|
|
mock_model = torch.nn.Sequential(torch.nn.Linear(10,1)) |
|
mock_preprocessor = lambda images, return_tensors: {"pixel_values": torch.randn(len(images), 3, 224, 224)} |
|
return mock_model, mock_preprocessor |
|
|
|
|
|
@dataclass |
|
class EvaluationResult: |
|
"""Data class for storing image evaluation results""" |
|
file_name: str |
|
image_path: str |
|
scores: Dict[str, Optional[float]] = field(default_factory=dict) |
|
final_score: Optional[float] = None |
|
|
|
def calculate_final_score(self, selected_models: List[str]) -> None: |
|
"""Calculate the average score from selected models""" |
|
valid_scores = [ |
|
score for model, score in self.scores.items() |
|
if model in selected_models and score is not None |
|
] |
|
self.final_score = np.mean(valid_scores) if valid_scores else None |
|
|
|
|
|
class BaseModel: |
|
"""Base class for all evaluation models""" |
|
def __init__(self, name: str): |
|
self.name = name |
|
self.device = 'cuda' if torch.cuda.is_available() else 'cpu' |
|
|
|
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
"""Evaluate a batch of images""" |
|
raise NotImplementedError |
|
|
|
|
|
class AestheticShadowModel(BaseModel): |
|
"""Aesthetic Shadow V2 model implementation""" |
|
def __init__(self): |
|
super().__init__("Aesthetic Shadow") |
|
logger.info(f"Loading {self.name} model...") |
|
self.model = pipeline( |
|
"image-classification", |
|
model="NeoChen1024/aesthetic-shadow-v2-backup", |
|
device=0 if self.device == 'cuda' else -1 |
|
) |
|
|
|
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
try: |
|
results = self.model(images) |
|
scores = [] |
|
for result_set in results: |
|
if not isinstance(result_set, list): |
|
result_set = [result_set] |
|
|
|
|
|
hq_score = 0 |
|
|
|
|
|
|
|
|
|
current_image_predictions = result_set |
|
if isinstance(result_set, list) and len(result_set) > 0 and isinstance(result_set[0], list) and len(images) == 1: |
|
|
|
current_image_predictions = result_set[0] |
|
|
|
hq_score_found = next((p['score'] for p in current_image_predictions if p['label'] == 'hq'), 0) |
|
scores.append(float(np.clip(hq_score_found * 10.0, 0.0, 10.0))) |
|
return scores |
|
except Exception as e: |
|
logger.error(f"Error in {self.name}: {e}") |
|
return [None] * len(images) |
|
|
|
|
|
class WaifuScorerModel(BaseModel): |
|
"""Waifu Scorer V3 model implementation""" |
|
def __init__(self): |
|
super().__init__("Waifu Scorer") |
|
logger.info(f"Loading {self.name} model...") |
|
self._load_model() |
|
|
|
def _load_model(self): |
|
try: |
|
import clip |
|
|
|
self.mlp = self._create_mlp() |
|
model_path = hf_hub_download("Eugeoter/waifu-scorer-v3", "model.pth") |
|
state_dict = torch.load(model_path, map_location=self.device) |
|
|
|
|
|
|
|
if any(key.startswith("layers.") for key in state_dict.keys()): |
|
new_state_dict = {} |
|
for k, v in state_dict.items(): |
|
if k.startswith("layers."): |
|
new_state_dict[k[len("layers."):]] = v |
|
else: |
|
|
|
new_state_dict[k] = v |
|
state_dict = new_state_dict |
|
|
|
|
|
self.mlp.load_state_dict(state_dict) |
|
self.mlp.to(self.device).eval() |
|
|
|
self.clip_model, self.preprocess = clip.load("ViT-L/14", device=self.device) |
|
self.available = True |
|
except ImportError: |
|
logger.error(f"Failed to load {self.name}: PyPI package 'clip' (openai-clip) not found. Please install it.") |
|
self.available = False |
|
except Exception as e: |
|
logger.error(f"Failed to load {self.name}: {e}") |
|
self.available = False |
|
|
|
def _create_mlp(self) -> torch.nn.Module: |
|
"""Create the MLP architecture""" |
|
return torch.nn.Sequential( |
|
torch.nn.Linear(768, 2048), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(2048), |
|
torch.nn.Dropout(0.3), |
|
torch.nn.Linear(2048, 512), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(512), |
|
torch.nn.Dropout(0.3), |
|
torch.nn.Linear(512, 256), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(256), |
|
torch.nn.Dropout(0.2), |
|
torch.nn.Linear(256, 128), |
|
torch.nn.ReLU(), |
|
torch.nn.BatchNorm1d(128), |
|
torch.nn.Dropout(0.1), |
|
torch.nn.Linear(128, 32), |
|
torch.nn.ReLU(), |
|
torch.nn.Linear(32, 1) |
|
) |
|
|
|
@torch.no_grad() |
|
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
if not self.available: |
|
return [None] * len(images) |
|
|
|
try: |
|
image_tensors = torch.cat([self.preprocess(img).unsqueeze(0) for img in images]) |
|
image_tensors = image_tensors.to(self.device) |
|
|
|
features = self.clip_model.encode_image(image_tensors) |
|
features = features.float() |
|
features = features / features.norm(dim=-1, keepdim=True) |
|
predictions = self.mlp(features) |
|
|
|
scores = predictions.clamp(0, 10).cpu().numpy().flatten().tolist() |
|
return scores |
|
except Exception as e: |
|
logger.error(f"Error in {self.name}: {e}") |
|
return [None] * len(images) |
|
|
|
|
|
class AestheticPredictorV25Model(BaseModel): |
|
"""Aesthetic Predictor V2.5 model implementation""" |
|
def __init__(self): |
|
super().__init__("Aesthetic V2.5") |
|
logger.info(f"Loading {self.name} model...") |
|
try: |
|
self.model, self.preprocessor = convert_v2_5_from_siglip( |
|
low_cpu_mem_usage=True, |
|
trust_remote_code=True, |
|
) |
|
if self.device == 'cuda': |
|
self.model = self.model.to(torch.bfloat16).cuda() |
|
self.available = True |
|
except Exception as e: |
|
logger.error(f"Failed to load {self.name}: {e}. Ensure 'aesthetic_predictor_v2_5.py' is correct and dependencies are installed.") |
|
self.available = False |
|
self.model, self.preprocessor = None, None |
|
|
|
|
|
@torch.no_grad() |
|
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
if not self.available: |
|
return [None] * len(images) |
|
try: |
|
images_rgb = [img.convert("RGB") for img in images] |
|
pixel_values = self.preprocessor(images=images_rgb, return_tensors="pt")["pixel_values"] |
|
|
|
if self.device == 'cuda': |
|
pixel_values = pixel_values.to(torch.bfloat16).cuda() |
|
else: |
|
pixel_values = pixel_values.float() |
|
|
|
logits = self.model(pixel_values).logits |
|
|
|
|
|
|
|
scores = logits.squeeze().float().cpu().numpy() |
|
if scores.ndim == 0: |
|
scores = np.array([scores.item()]) |
|
|
|
return [float(np.clip(s, 0.0, 10.0)) for s in scores] |
|
except Exception as e: |
|
logger.error(f"Error in {self.name}: {e}") |
|
return [None] * len(images) |
|
|
|
|
|
class AnimeAestheticModel(BaseModel): |
|
"""Anime Aesthetic model implementation""" |
|
def __init__(self): |
|
super().__init__("Anime Score") |
|
logger.info(f"Loading {self.name} model...") |
|
try: |
|
model_path = hf_hub_download(repo_id="skytnt/anime-aesthetic", filename="model.onnx") |
|
self.session = rt.InferenceSession(model_path, providers=['CPUExecutionProvider']) |
|
self.available = True |
|
except Exception as e: |
|
logger.error(f"Failed to load {self.name}: {e}") |
|
self.available = False |
|
self.session = None |
|
|
|
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]: |
|
if not self.available: |
|
return [None] * len(images) |
|
scores = [] |
|
for img in images: |
|
try: |
|
score = self._process_single_image(img) |
|
scores.append(float(np.clip(score * 10.0, 0.0, 10.0))) |
|
except Exception as e: |
|
logger.error(f"Error in {self.name} for single image processing: {e}") |
|
scores.append(None) |
|
return scores |
|
|
|
def _process_single_image(self, img: Image.Image) -> float: |
|
"""Process a single image through the model""" |
|
|
|
img_rgb = img.convert("RGB") |
|
img_np = np.array(img_rgb).astype(np.float32) / 255.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
size = 224 |
|
|
|
|
|
|
|
|
|
|
|
h, w = img_np.shape[:2] |
|
|
|
if h > w: |
|
new_h, new_w = size, int(size * w / h) |
|
else: |
|
new_h, new_w = int(size * h / w), size |
|
|
|
resized_img = cv2.resize(img_np, (new_w, new_h), interpolation=cv2.INTER_AREA) |
|
|
|
canvas = np.ones((size, size, 3), dtype=np.float32) * 0.5 |
|
|
|
pad_h = (size - new_h) // 2 |
|
pad_w = (size - new_w) // 2 |
|
|
|
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w, :] = resized_img |
|
|
|
input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :].astype(np.float32) |
|
return self.session.run(None, {"img": input_tensor})[0].item() |
|
|
|
|
|
class ImageEvaluator: |
|
"""Main class for managing image evaluation""" |
|
def __init__(self): |
|
self.models: Dict[str, BaseModel] = {} |
|
self._initialize_models() |
|
self.results: List[EvaluationResult] = [] |
|
|
|
def _initialize_models(self): |
|
"""Initialize all evaluation models""" |
|
model_classes = [ |
|
("aesthetic_shadow", AestheticShadowModel), |
|
("waifu_scorer", WaifuScorerModel), |
|
("aesthetic_predictor_v2_5", AestheticPredictorV25Model), |
|
("anime_aesthetic", AnimeAestheticModel), |
|
] |
|
|
|
for key, model_class in model_classes: |
|
try: |
|
model_instance = model_class() |
|
|
|
if hasattr(model_instance, 'available') and model_instance.available: |
|
self.models[key] = model_instance |
|
logger.info(f"Successfully loaded and initialized {model_instance.name} ({key})") |
|
elif not hasattr(model_instance, 'available'): |
|
self.models[key] = model_instance |
|
logger.info(f"Successfully loaded and initialized {model_instance.name} ({key}) (availability not explicitly tracked)") |
|
else: |
|
logger.warning(f"{model_instance.name} ({key}) was not loaded successfully and will be skipped.") |
|
except Exception as e: |
|
logger.error(f"Failed to initialize {key}: {e}") |
|
|
|
async def evaluate_images( |
|
self, |
|
file_paths: List[str], |
|
selected_models: List[str], |
|
batch_size: int = 8, |
|
progress_callback = None |
|
) -> Tuple[List[EvaluationResult], List[str]]: |
|
"""Evaluate images with selected models""" |
|
logs = [] |
|
current_results = [] |
|
|
|
images_data = [] |
|
for path_obj in file_paths: |
|
path = path_obj.name |
|
try: |
|
img = Image.open(path).convert("RGB") |
|
images_data.append({"image": img, "path": path, "name": Path(path).name}) |
|
except Exception as e: |
|
logs.append(f"Failed to load {Path(path).name}: {e}") |
|
|
|
if not images_data: |
|
logs.append("No valid images to process") |
|
return current_results, logs |
|
|
|
logs.append(f"Loaded {len(images_data)} images") |
|
|
|
|
|
active_selected_models = [m_key for m_key in selected_models if m_key in self.models] |
|
if len(active_selected_models) != len(selected_models): |
|
disabled_models = set(selected_models) - set(active_selected_models) |
|
logs.append(f"Warning: The following models were selected but are not available: {', '.join(disabled_models)}") |
|
|
|
|
|
|
|
for data in images_data: |
|
result = EvaluationResult( |
|
file_name=data["name"], |
|
image_path=data["path"] |
|
) |
|
current_results.append(result) |
|
|
|
total_images = len(images_data) |
|
processed_count = 0 |
|
|
|
for model_key in active_selected_models: |
|
model_instance = self.models[model_key] |
|
logs.append(f"Processing with {model_instance.name}...") |
|
|
|
for i in range(0, total_images, batch_size): |
|
batch_data = images_data[i:i + batch_size] |
|
batch_images_pil = [d["image"] for d in batch_data] |
|
|
|
try: |
|
scores = await model_instance.evaluate_batch(batch_images_pil) |
|
for k, score in enumerate(scores): |
|
|
|
|
|
current_results[i+k].scores[model_key] = score |
|
except Exception as e: |
|
logger.error(f"Error evaluating batch with {model_instance.name}: {e}") |
|
for k in range(len(batch_images_pil)): |
|
current_results[i+k].scores[model_key] = None |
|
|
|
processed_count += len(batch_images_pil) |
|
if progress_callback: |
|
|
|
|
|
current_model_idx = active_selected_models.index(model_key) |
|
overall_progress = ((current_model_idx / len(active_selected_models)) + \ |
|
((i + len(batch_data)) / total_images) / len(active_selected_models)) * 100 |
|
progress_callback(min(overall_progress, 100), f"Model: {model_instance.name}, Batch {i//batch_size + 1}") |
|
|
|
|
|
for result in current_results: |
|
result.calculate_final_score(active_selected_models) |
|
|
|
logs.append("Evaluation complete.") |
|
self.results = current_results |
|
return current_results, logs |
|
|
|
def get_results_dataframe(self, selected_models_keys: List[str]) -> pd.DataFrame: |
|
if not self.results: |
|
return pd.DataFrame() |
|
|
|
data = [] |
|
|
|
valid_selected_models_keys = [key for key in selected_models_keys if key in self.models] |
|
|
|
for result in self.results: |
|
row = { |
|
'File Name': result.file_name, |
|
|
|
|
|
'Image': gr.Image(result.image_path, type="pil", height=100, width=100) |
|
} |
|
|
|
for model_key in valid_selected_models_keys: |
|
model_name = self.models[model_key].name |
|
score = result.scores.get(model_key) |
|
row[model_name] = f"{score:.4f}" if score is not None else "N/A" |
|
|
|
row['Final Score'] = f"{result.final_score:.4f}" if result.final_score is not None else "N/A" |
|
data.append(row) |
|
|
|
|
|
column_order = ['File Name', 'Image'] + \ |
|
[self.models[key].name for key in valid_selected_models_keys if key in self.models] + \ |
|
['Final Score'] |
|
|
|
df = pd.DataFrame(data) |
|
if not df.empty: |
|
df = df[column_order] |
|
return df |
|
|
|
|
|
def create_interface(): |
|
"""Create the Gradio interface""" |
|
evaluator = ImageEvaluator() |
|
|
|
model_options = [ |
|
(model.name, key) for key, model in evaluator.models.items() |
|
] |
|
|
|
|
|
default_selected_model_labels = [name for name, key in model_options] |
|
|
|
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="Image Evaluation Tool") as demo: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
gr.Markdown(""" |
|
# π¨ Advanced Image Evaluation Tool |
|
|
|
Evaluate images using state-of-the-art aesthetic and quality prediction models. |
|
Upload your images and select the models you want to use for evaluation. |
|
""") |
|
|
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
input_files = gr.File( |
|
label="Upload Images", |
|
file_count="multiple", |
|
file_types=["image"] |
|
) |
|
|
|
model_checkboxes = gr.CheckboxGroup( |
|
choices=[label for label, _ in model_options], |
|
value=default_selected_model_labels, |
|
label="Select Models", |
|
info="Choose which models to use for evaluation. Models that failed to load will not be available." |
|
) |
|
|
|
batch_size_slider = gr.Slider( |
|
minimum=1, |
|
maximum=32, |
|
value=8, |
|
step=1, |
|
label="Batch Size", |
|
info="Number of images to process at once per model." |
|
) |
|
|
|
with gr.Row(): |
|
evaluate_btn = gr.Button("π Evaluate Images", variant="primary", scale=2) |
|
clear_btn = gr.Button("ποΈ Clear", variant="secondary", scale=1) |
|
|
|
with gr.Column(scale=3): |
|
|
|
logs_display = gr.Textbox( |
|
label="Processing Logs", |
|
lines=10, |
|
max_lines=20, |
|
autoscroll=True, |
|
interactive=False |
|
) |
|
|
|
|
|
progress_status = gr.Label(label="Progress") |
|
|
|
results_df_display = gr.Dataframe( |
|
label="Evaluation Results", |
|
interactive=False, |
|
wrap=True, |
|
|
|
|
|
|
|
|
|
) |
|
|
|
download_button = gr.Button("π₯ Download Results (CSV)", variant="secondary") |
|
|
|
|
|
download_file_output_component = gr.File(label="Download", visible=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
async def run_evaluation(files, selected_model_labels, current_batch_size, progress=gr.Progress(track_tqdm=True)): |
|
if not files: |
|
return "Please upload images first.", pd.DataFrame(), [], "No files uploaded." |
|
|
|
|
|
selected_model_keys = [key for label, key in model_options if label in selected_model_labels] |
|
|
|
if not selected_model_keys: |
|
return "Please select at least one model.", pd.DataFrame(), [], "No models selected." |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
progress_updates = [] |
|
def progress_callback_for_eval(p_value, p_desc): |
|
progress(p_value / 100, desc=p_desc) |
|
|
|
progress_updates.append(f"{p_desc} - {p_value:.0f}%") |
|
|
|
|
|
|
|
processed_results, log_messages = await evaluator.evaluate_images( |
|
files, |
|
selected_model_keys, |
|
int(current_batch_size), |
|
progress_callback_for_eval |
|
) |
|
|
|
df = evaluator.get_results_dataframe(selected_model_keys) |
|
log_text = "\n".join(log_messages + progress_updates) |
|
|
|
final_status = "Evaluation complete." if processed_results else "Evaluation failed or no results." |
|
progress(1.0, desc=final_status) |
|
|
|
return log_text, df, final_status |
|
|
|
def handle_model_selection_change(selected_model_labels_updated): |
|
|
|
if not evaluator.results: |
|
return pd.DataFrame() |
|
|
|
selected_model_keys_updated = [key for label, key in model_options if label in selected_model_labels_updated] |
|
|
|
|
|
for res_obj in evaluator.results: |
|
res_obj.calculate_final_score(selected_model_keys_updated) |
|
|
|
return evaluator.get_results_dataframe(selected_model_keys_updated) |
|
|
|
def clear_all_outputs(): |
|
evaluator.results = [] |
|
return "", pd.DataFrame(), "Cleared.", None |
|
|
|
def generate_csv_for_download(selected_model_labels_for_csv): |
|
if not evaluator.results: |
|
gr.Warning("No results to download.") |
|
return None |
|
|
|
selected_model_keys_for_csv = [key for label, key in model_options if label in selected_model_labels_for_csv] |
|
|
|
|
|
df_for_csv = evaluator.get_results_dataframe(selected_model_keys_for_csv).copy() |
|
if 'Image' in df_for_csv.columns: |
|
df_for_csv.drop(columns=['Image'], inplace=True) |
|
|
|
if df_for_csv.empty: |
|
gr.Warning("No data to download based on current selection.") |
|
return None |
|
|
|
import tempfile |
|
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as tmp_file: |
|
df_for_csv.to_csv(tmp_file.name, index=False) |
|
return tmp_file.name |
|
|
|
evaluate_btn.click( |
|
fn=run_evaluation, |
|
inputs=[input_files, model_checkboxes, batch_size_slider], |
|
outputs=[logs_display, results_df_display, progress_status] |
|
) |
|
|
|
model_checkboxes.change( |
|
fn=handle_model_selection_change, |
|
inputs=[model_checkboxes], |
|
outputs=[results_df_display] |
|
) |
|
|
|
clear_btn.click( |
|
fn=clear_all_outputs, |
|
outputs=[logs_display, results_df_display, progress_status, download_file_output_component] |
|
) |
|
|
|
download_button.click( |
|
fn=generate_csv_for_download, |
|
inputs=[model_checkboxes], |
|
outputs=[download_file_output_component] |
|
) |
|
|
|
gr.Markdown(""" |
|
### π Notes |
|
- **Model Selection**: Choose which models to use for evaluation. The final score is the average of the selected models. Models that failed to load during startup will not be listed or will be ignored. |
|
- **Batch Size**: Adjust based on your system's VRAM and RAM. Smaller batches use less memory but may be slower overall. |
|
- **Results Table**: Displays scores from selected models and the final average. Images are shown as thumbnails. |
|
- **Download**: Export results (excluding image thumbnails) as a CSV file for further analysis. |
|
|
|
### π― Score Interpretation (General Guide) |
|
- **7-10**: High quality/aesthetic appeal |
|
- **5-7**: Medium quality |
|
- **0-5**: Lower quality |
|
_(Note: Score ranges and interpretations can vary between models.)_ |
|
""") |
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
|
|
app_interface = create_interface() |
|
|
|
|
|
app_interface.queue().launch(debug=True) |