CIET / app.py
VOIDER's picture
Update app.py
d093305 verified
raw
history blame
31.2 kB
import os
import asyncio
from typing import List, Dict, Optional, Tuple, Any
from dataclasses import dataclass, field
from pathlib import Path
import logging
import cv2
import numpy as np
import torch
import onnxruntime as rt
from PIL import Image
import gradio as gr
from transformers import pipeline
from huggingface_hub import hf_hub_download
import pandas as pd
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Import aesthetic predictor function
# Ensure 'aesthetic_predictor_v2_5.py' is in the same directory or accessible in PYTHONPATH
# from aesthetic_predictor_v2_5 import convert_v2_5_from_siglip
# Placeholder for the import if the file is missing, to allow syntax checking
def convert_v2_5_from_siglip(low_cpu_mem_usage=True, trust_remote_code=True):
# This is a placeholder. Replace with actual import and ensure the function exists.
logger.warning("Using placeholder for convert_v2_5_from_siglip. Ensure the actual implementation is available.")
# Mocking a model and preprocessor structure
mock_model = torch.nn.Sequential(torch.nn.Linear(10,1)) # Dummy model
mock_preprocessor = lambda images, return_tensors: {"pixel_values": torch.randn(len(images), 3, 224, 224)} # Dummy preprocessor
return mock_model, mock_preprocessor
@dataclass
class EvaluationResult:
"""Data class for storing image evaluation results"""
file_name: str
image_path: str
scores: Dict[str, Optional[float]] = field(default_factory=dict)
final_score: Optional[float] = None
def calculate_final_score(self, selected_models: List[str]) -> None:
"""Calculate the average score from selected models"""
valid_scores = [
score for model, score in self.scores.items()
if model in selected_models and score is not None
]
self.final_score = np.mean(valid_scores) if valid_scores else None
class BaseModel:
"""Base class for all evaluation models"""
def __init__(self, name: str):
self.name = name
self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]:
"""Evaluate a batch of images"""
raise NotImplementedError
class AestheticShadowModel(BaseModel):
"""Aesthetic Shadow V2 model implementation"""
def __init__(self):
super().__init__("Aesthetic Shadow")
logger.info(f"Loading {self.name} model...")
self.model = pipeline(
"image-classification",
model="NeoChen1024/aesthetic-shadow-v2-backup",
device=0 if self.device == 'cuda' else -1
)
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]:
try:
results = self.model(images)
scores = []
for result_set in results: # self.model(images) returns a list of lists of dicts if multiple images
if not isinstance(result_set, list): # If single image, it returns a list of dicts
result_set = [result_set]
# Correctly handle varying structures from the pipeline
hq_score = 0
# The pipeline might return a list of dicts for each image, or just a list of dicts for a single image
# For multiple images, results is List[List[Dict]]
# For a single image, results is List[Dict] - pipeline might batch internally
# The provided code expects `results` to be a list of predictions, where each prediction is a list of class scores.
current_image_predictions = result_set
if isinstance(result_set, list) and len(result_set) > 0 and isinstance(result_set[0], list) and len(images) == 1:
# Handle cases where pipeline wraps single image result in an extra list
current_image_predictions = result_set[0]
hq_score_found = next((p['score'] for p in current_image_predictions if p['label'] == 'hq'), 0)
scores.append(float(np.clip(hq_score_found * 10.0, 0.0, 10.0)))
return scores
except Exception as e:
logger.error(f"Error in {self.name}: {e}")
return [None] * len(images)
class WaifuScorerModel(BaseModel):
"""Waifu Scorer V3 model implementation"""
def __init__(self):
super().__init__("Waifu Scorer")
logger.info(f"Loading {self.name} model...")
self._load_model()
def _load_model(self):
try:
import clip
self.mlp = self._create_mlp()
model_path = hf_hub_download("Eugeoter/waifu-scorer-v3", "model.pth")
state_dict = torch.load(model_path, map_location=self.device)
# --- FIX for state_dict key mismatch ---
# Check if keys are prefixed (e.g., "layers.0.weight") and adjust
if any(key.startswith("layers.") for key in state_dict.keys()):
new_state_dict = {}
for k, v in state_dict.items():
if k.startswith("layers."):
new_state_dict[k[len("layers."):]] = v
else:
# Keep other keys if any (though error suggests all relevant keys were prefixed)
new_state_dict[k] = v
state_dict = new_state_dict
# --- END FIX ---
self.mlp.load_state_dict(state_dict)
self.mlp.to(self.device).eval()
self.clip_model, self.preprocess = clip.load("ViT-L/14", device=self.device)
self.available = True
except ImportError:
logger.error(f"Failed to load {self.name}: PyPI package 'clip' (openai-clip) not found. Please install it.")
self.available = False
except Exception as e:
logger.error(f"Failed to load {self.name}: {e}")
self.available = False
def _create_mlp(self) -> torch.nn.Module:
"""Create the MLP architecture"""
return torch.nn.Sequential(
torch.nn.Linear(768, 2048),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(2048),
torch.nn.Dropout(0.3),
torch.nn.Linear(2048, 512),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(512),
torch.nn.Dropout(0.3),
torch.nn.Linear(512, 256),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(256),
torch.nn.Dropout(0.2),
torch.nn.Linear(256, 128),
torch.nn.ReLU(),
torch.nn.BatchNorm1d(128),
torch.nn.Dropout(0.1),
torch.nn.Linear(128, 32),
torch.nn.ReLU(),
torch.nn.Linear(32, 1)
)
@torch.no_grad()
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]:
if not self.available:
return [None] * len(images)
try:
image_tensors = torch.cat([self.preprocess(img).unsqueeze(0) for img in images])
image_tensors = image_tensors.to(self.device)
features = self.clip_model.encode_image(image_tensors)
features = features.float() # Ensure features are float32 for MLP
features = features / features.norm(dim=-1, keepdim=True)
predictions = self.mlp(features)
scores = predictions.clamp(0, 10).cpu().numpy().flatten().tolist()
return scores
except Exception as e:
logger.error(f"Error in {self.name}: {e}")
return [None] * len(images)
class AestheticPredictorV25Model(BaseModel):
"""Aesthetic Predictor V2.5 model implementation"""
def __init__(self):
super().__init__("Aesthetic V2.5")
logger.info(f"Loading {self.name} model...")
try:
self.model, self.preprocessor = convert_v2_5_from_siglip(
low_cpu_mem_usage=True,
trust_remote_code=True, # Be cautious with trust_remote_code=True
)
if self.device == 'cuda':
self.model = self.model.to(torch.bfloat16).cuda()
self.available = True
except Exception as e:
logger.error(f"Failed to load {self.name}: {e}. Ensure 'aesthetic_predictor_v2_5.py' is correct and dependencies are installed.")
self.available = False
self.model, self.preprocessor = None, None
@torch.no_grad()
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]:
if not self.available:
return [None] * len(images)
try:
images_rgb = [img.convert("RGB") for img in images]
pixel_values = self.preprocessor(images=images_rgb, return_tensors="pt")["pixel_values"] # Access pixel_values key
if self.device == 'cuda':
pixel_values = pixel_values.to(torch.bfloat16).cuda()
else:
pixel_values = pixel_values.float() # Ensure correct dtype for CPU
logits = self.model(pixel_values).logits # Get logits if model output is a dataclass/dict
# If model directly returns logits tensor:
# logits = self.model(pixel_values)
scores = logits.squeeze().float().cpu().numpy()
if scores.ndim == 0: # Handle single image case
scores = np.array([scores.item()]) # Use .item() for scalar tensor
return [float(np.clip(s, 0.0, 10.0)) for s in scores]
except Exception as e:
logger.error(f"Error in {self.name}: {e}")
return [None] * len(images)
class AnimeAestheticModel(BaseModel):
"""Anime Aesthetic model implementation"""
def __init__(self):
super().__init__("Anime Score")
logger.info(f"Loading {self.name} model...")
try:
model_path = hf_hub_download(repo_id="skytnt/anime-aesthetic", filename="model.onnx")
self.session = rt.InferenceSession(model_path, providers=['CPUExecutionProvider'])
self.available = True
except Exception as e:
logger.error(f"Failed to load {self.name}: {e}")
self.available = False
self.session = None
async def evaluate_batch(self, images: List[Image.Image]) -> List[Optional[float]]:
if not self.available:
return [None] * len(images)
scores = []
for img in images:
try:
score = self._process_single_image(img)
scores.append(float(np.clip(score * 10.0, 0.0, 10.0)))
except Exception as e:
logger.error(f"Error in {self.name} for single image processing: {e}")
scores.append(None)
return scores
def _process_single_image(self, img: Image.Image) -> float:
"""Process a single image through the model"""
# Ensure image is RGB
img_rgb = img.convert("RGB")
img_np = np.array(img_rgb).astype(np.float32) / 255.0
# Original model expects BGR, but most image ops are RGB.
# If ONNX model was trained on BGR, conversion might be needed.
# Assuming model takes RGB based on common practices unless specified.
# If it expects BGR: img_np = cv2.cvtColor(np.array(img.convert("RGB")), cv2.COLOR_RGB2BGR).astype(np.float32) / 255.0
size = 224 # Typical size for many aesthetic models, 768 is very large for direct input.
# The original notebook for skytnt/anime-aesthetic uses 224x224.
# Let's assume 224 unless documentation says 768.
# The error log doesn't specify input size issues, but 768 is unusually large for this type of ONNX model.
# Sticking to original code's 768 for now, but this is a potential point of error if model expects 224.
h, w = img_np.shape[:2]
if h > w:
new_h, new_w = size, int(size * w / h)
else:
new_h, new_w = int(size * h / w), size
resized_img = cv2.resize(img_np, (new_w, new_h), interpolation=cv2.INTER_AREA) # Use INTER_AREA for shrinking
canvas = np.ones((size, size, 3), dtype=np.float32) * 0.5 # Pad with gray, or use black (0)
pad_h = (size - new_h) // 2
pad_w = (size - new_w) // 2
canvas[pad_h:pad_h+new_h, pad_w:pad_w+new_w, :] = resized_img
input_tensor = np.transpose(canvas, (2, 0, 1))[np.newaxis, :].astype(np.float32)
return self.session.run(None, {"img": input_tensor})[0].item()
class ImageEvaluator:
"""Main class for managing image evaluation"""
def __init__(self):
self.models: Dict[str, BaseModel] = {}
self._initialize_models()
self.results: List[EvaluationResult] = []
def _initialize_models(self):
"""Initialize all evaluation models"""
model_classes = [
("aesthetic_shadow", AestheticShadowModel),
("waifu_scorer", WaifuScorerModel),
("aesthetic_predictor_v2_5", AestheticPredictorV25Model),
("anime_aesthetic", AnimeAestheticModel),
]
for key, model_class in model_classes:
try:
model_instance = model_class()
# Store only if model is available (loaded successfully)
if hasattr(model_instance, 'available') and model_instance.available:
self.models[key] = model_instance
logger.info(f"Successfully loaded and initialized {model_instance.name} ({key})")
elif not hasattr(model_instance, 'available'): # For models without explicit 'available' flag
self.models[key] = model_instance
logger.info(f"Successfully loaded and initialized {model_instance.name} ({key}) (availability not explicitly tracked)")
else:
logger.warning(f"{model_instance.name} ({key}) was not loaded successfully and will be skipped.")
except Exception as e:
logger.error(f"Failed to initialize {key}: {e}")
async def evaluate_images(
self,
file_paths: List[str],
selected_models: List[str],
batch_size: int = 8,
progress_callback = None
) -> Tuple[List[EvaluationResult], List[str]]:
"""Evaluate images with selected models"""
logs = []
current_results = [] # Use a local list for current evaluation
images_data = [] # Store tuples of (image, original_path)
for path_obj in file_paths: # file_paths are UploadFile objects from Gradio
path = path_obj.name # .name gives the temporary file path
try:
img = Image.open(path).convert("RGB")
images_data.append({"image": img, "path": path, "name": Path(path).name})
except Exception as e:
logs.append(f"Failed to load {Path(path).name}: {e}")
if not images_data:
logs.append("No valid images to process")
return current_results, logs
logs.append(f"Loaded {len(images_data)} images")
# Filter selected_models to only include those that were successfully initialized
active_selected_models = [m_key for m_key in selected_models if m_key in self.models]
if len(active_selected_models) != len(selected_models):
disabled_models = set(selected_models) - set(active_selected_models)
logs.append(f"Warning: The following models were selected but are not available: {', '.join(disabled_models)}")
# Initialize results for all images first
for data in images_data:
result = EvaluationResult(
file_name=data["name"],
image_path=data["path"] # Store original path for display if needed
)
current_results.append(result)
total_images = len(images_data)
processed_count = 0
for model_key in active_selected_models:
model_instance = self.models[model_key]
logs.append(f"Processing with {model_instance.name}...")
for i in range(0, total_images, batch_size):
batch_data = images_data[i:i + batch_size]
batch_images_pil = [d["image"] for d in batch_data]
try:
scores = await model_instance.evaluate_batch(batch_images_pil)
for k, score in enumerate(scores):
# Find the corresponding EvaluationResult object
# This assumes current_results is ordered the same as images_data
current_results[i+k].scores[model_key] = score
except Exception as e:
logger.error(f"Error evaluating batch with {model_instance.name}: {e}")
for k in range(len(batch_images_pil)):
current_results[i+k].scores[model_key] = None
processed_count += len(batch_images_pil)
if progress_callback:
# Progress based on overall images processed per model, then average over models
# This logic might need refinement for a smoother progress bar experience
current_model_idx = active_selected_models.index(model_key)
overall_progress = ((current_model_idx / len(active_selected_models)) + \
((i + len(batch_data)) / total_images) / len(active_selected_models)) * 100
progress_callback(min(overall_progress, 100), f"Model: {model_instance.name}, Batch {i//batch_size + 1}")
# Calculate final scores for all results
for result in current_results:
result.calculate_final_score(active_selected_models)
logs.append("Evaluation complete.")
self.results = current_results # Update the main results list
return current_results, logs
def get_results_dataframe(self, selected_models_keys: List[str]) -> pd.DataFrame:
if not self.results:
return pd.DataFrame()
data = []
# Ensure selected_models_keys only contains keys of successfully loaded models
valid_selected_models_keys = [key for key in selected_models_keys if key in self.models]
for result in self.results:
row = {
'File Name': result.file_name,
# For Gradio display, we might want to show the image itself
# 'Image': result.image_path, # This will show the temp path
'Image': gr.Image(result.image_path, type="pil", height=100, width=100) # Display thumbnail
}
for model_key in valid_selected_models_keys:
model_name = self.models[model_key].name
score = result.scores.get(model_key)
row[model_name] = f"{score:.4f}" if score is not None else "N/A"
row['Final Score'] = f"{result.final_score:.4f}" if result.final_score is not None else "N/A"
data.append(row)
# Define column order
column_order = ['File Name', 'Image'] + \
[self.models[key].name for key in valid_selected_models_keys if key in self.models] + \
['Final Score']
df = pd.DataFrame(data)
if not df.empty:
df = df[column_order] # Reorder columns
return df
def create_interface():
"""Create the Gradio interface"""
evaluator = ImageEvaluator()
model_options = [
(model.name, key) for key, model in evaluator.models.items()
]
# If some models failed to load, model_options will be shorter.
# Provide default selected models based on successfully loaded ones.
default_selected_model_labels = [name for name, key in model_options]
with gr.Blocks(theme=gr.themes.Soft(), title="Image Evaluation Tool") as demo:
# NOTE on Gradio TypeError:
# The traceback "TypeError: argument of type 'bool' is not iterable" during Gradio startup
# (specifically in `gradio_client/utils.py` while processing component schemas)
# often indicates an incompatibility with the Gradio version being used or a bug
# in how Gradio generates schemas for certain component configurations.
# The most common recommendation is to:
# 1. Ensure your Gradio library is up-to-date: `pip install --upgrade gradio`
# 2. If the error persists, try simplifying complex component configurations or
# testing with a known stable version of Gradio.
# The code below follows standard Gradio practices, so the error is likely
# environment-related if it persists after the WaifuScorer fix.
gr.Markdown("""
# 🎨 Advanced Image Evaluation Tool
Evaluate images using state-of-the-art aesthetic and quality prediction models.
Upload your images and select the models you want to use for evaluation.
""")
with gr.Row():
with gr.Column(scale=1):
input_files = gr.File(
label="Upload Images",
file_count="multiple",
file_types=["image"]
)
model_checkboxes = gr.CheckboxGroup(
choices=[label for label, _ in model_options], # Use labels for choices
value=default_selected_model_labels, # Default to all loaded models
label="Select Models",
info="Choose which models to use for evaluation. Models that failed to load will not be available."
)
batch_size_slider = gr.Slider( # Renamed to avoid conflict with batch_size variable name
minimum=1,
maximum=32, # Max 64 might be too high for some GPUs
value=8,
step=1,
label="Batch Size",
info="Number of images to process at once per model."
)
with gr.Row():
evaluate_btn = gr.Button("πŸš€ Evaluate Images", variant="primary", scale=2)
clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary", scale=1)
with gr.Column(scale=3): # Increased scale for results
# Using gr.Textbox for logs, as gr.Progress is now a status tracker
logs_display = gr.Textbox(
label="Processing Logs",
lines=10,
max_lines=20, # Allow more lines
autoscroll=True,
interactive=False
)
# Using gr.Label for progress status updates
progress_status = gr.Label(label="Progress")
results_df_display = gr.Dataframe(
label="Evaluation Results",
interactive=False,
wrap=True,
# Define column types for better display, especially for images
# headers=['File Name', 'Image'] + default_selected_model_labels + ['Final Score'],
# col_count=(len(default_selected_model_labels) + 3, "fixed"),
# datatype=['str', 'image'] + ['number'] * (len(default_selected_model_labels) + 1)
)
download_button = gr.Button("πŸ“₯ Download Results (CSV)", variant="secondary") # Changed from gr.Button to potentially use gr.DownloadButton later
# download_file_output = gr.File(label="Download CSV", visible=False, interactive=False)
# Using gr.File for download output triggered by a regular button
download_file_output_component = gr.File(label="Download", visible=False)
# State for storing full EvaluationResult objects if needed for more complex interactions
# For this setup, regenerating DataFrame from evaluator.results is generally sufficient
# results_state = gr.State([]) # If storing raw results is complex, simplify or manage carefully
async def run_evaluation(files, selected_model_labels, current_batch_size, progress=gr.Progress(track_tqdm=True)):
if not files:
return "Please upload images first.", pd.DataFrame(), [], "No files uploaded."
# Convert display labels back to model keys
selected_model_keys = [key for label, key in model_options if label in selected_model_labels]
if not selected_model_keys:
return "Please select at least one model.", pd.DataFrame(), [], "No models selected."
# file_paths = [f.name for f in files] # .name gives temp path of UploadFile
# Progress callback
# def update_progress_display(value, desc="Processing..."):
# progress(value / 100, desc=f"{desc} {value:.0f}%")
# return f"{desc} {value:.0f}%" # For gr.Label
# Use gr.Progress context for automatic updates with iterators
# However, for manual updates with batching, direct calls are fine.
# We'll update logs_display and progress_status manually.
progress_updates = []
def progress_callback_for_eval(p_value, p_desc):
progress(p_value / 100, desc=p_desc) # Update the main progress component
# logs_display.value += f"\n{p_desc} - {p_value:.0f}%" # This will make logs messy
progress_updates.append(f"{p_desc} - {p_value:.0f}%")
# Evaluate images
processed_results, log_messages = await evaluator.evaluate_images(
files, # Pass the list of UploadFile objects directly
selected_model_keys,
int(current_batch_size), # Ensure batch_size is int
progress_callback_for_eval # Pass the callback
)
df = evaluator.get_results_dataframe(selected_model_keys)
log_text = "\n".join(log_messages + progress_updates)
final_status = "Evaluation complete." if processed_results else "Evaluation failed or no results."
progress(1.0, desc=final_status) # Mark progress as complete
return log_text, df, final_status # Removed results_state for simplicity
def handle_model_selection_change(selected_model_labels_updated):
# Called when checkbox group changes. evaluator.results should already be populated.
if not evaluator.results:
return pd.DataFrame() # No results to re-filter/re-calculate
selected_model_keys_updated = [key for label, key in model_options if label in selected_model_labels_updated]
# Recalculate final scores for all existing results based on new selection
for res_obj in evaluator.results:
res_obj.calculate_final_score(selected_model_keys_updated)
return evaluator.get_results_dataframe(selected_model_keys_updated)
def clear_all_outputs():
evaluator.results = [] # Clear stored results in the evaluator
return "", pd.DataFrame(), "Cleared.", None # Log, DataFrame, Progress Status, Download File
def generate_csv_for_download(selected_model_labels_for_csv):
if not evaluator.results:
gr.Warning("No results to download.")
return None
selected_model_keys_for_csv = [key for label, key in model_options if label in selected_model_labels_for_csv]
# Get DataFrame, but exclude the gr.Image column for CSV
df_for_csv = evaluator.get_results_dataframe(selected_model_keys_for_csv).copy()
if 'Image' in df_for_csv.columns:
df_for_csv.drop(columns=['Image'], inplace=True)
if df_for_csv.empty:
gr.Warning("No data to download based on current selection.")
return None
import tempfile
with tempfile.NamedTemporaryFile(mode='w+', delete=False, suffix='.csv', encoding='utf-8') as tmp_file:
df_for_csv.to_csv(tmp_file.name, index=False)
return tmp_file.name
evaluate_btn.click(
fn=run_evaluation,
inputs=[input_files, model_checkboxes, batch_size_slider],
outputs=[logs_display, results_df_display, progress_status] # Removed results_state
)
model_checkboxes.change(
fn=handle_model_selection_change,
inputs=[model_checkboxes],
outputs=[results_df_display]
)
clear_btn.click(
fn=clear_all_outputs,
outputs=[logs_display, results_df_display, progress_status, download_file_output_component]
)
download_button.click(
fn=generate_csv_for_download,
inputs=[model_checkboxes],
outputs=[download_file_output_component]
)
gr.Markdown("""
### πŸ“ Notes
- **Model Selection**: Choose which models to use for evaluation. The final score is the average of the selected models. Models that failed to load during startup will not be listed or will be ignored.
- **Batch Size**: Adjust based on your system's VRAM and RAM. Smaller batches use less memory but may be slower overall.
- **Results Table**: Displays scores from selected models and the final average. Images are shown as thumbnails.
- **Download**: Export results (excluding image thumbnails) as a CSV file for further analysis.
### 🎯 Score Interpretation (General Guide)
- **7-10**: High quality/aesthetic appeal
- **5-7**: Medium quality
- **0-5**: Lower quality
_(Note: Score ranges and interpretations can vary between models.)_
""")
return demo
if __name__ == "__main__":
# Ensure 'aesthetic_predictor_v2_5.py' exists and 'openai-clip' is installed for WaifuScorer
# Example: pip install openai-clip transformers==4.30.2 onnxruntime gradio pandas Pillow opencv-python
# Check specific model requirements.
# Create and launch the interface
app_interface = create_interface()
# Adding .queue() is good for handling multiple users or long-running tasks.
# Set debug=True for more detailed Gradio errors during development.
app_interface.queue().launch(debug=True)