Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Ultra Supreme Optimizer - Main optimization engine for image analysis | |
VERSIÓN MEJORADA - Usa el prompt completo de CLIP Interrogator | |
""" | |
# IMPORTANT: spaces must be imported BEFORE torch or any CUDA-using library | |
import spaces | |
import gc | |
import logging | |
import re | |
from datetime import datetime | |
from typing import Tuple, Dict, Any, Optional | |
import torch | |
import numpy as np | |
from PIL import Image | |
from clip_interrogator import Config, Interrogator | |
from analyzer import UltraSupremeAnalyzer | |
logger = logging.getLogger(__name__) | |
class UltraSupremeOptimizer: | |
"""Main optimizer class for ultra supreme image analysis""" | |
def __init__(self): | |
self.interrogator: Optional[Interrogator] = None | |
self.analyzer = UltraSupremeAnalyzer() | |
self.usage_count = 0 | |
self.device = self._get_device() | |
self.is_initialized = False | |
# NO inicializar modelo aquí - hacerlo lazy | |
def _get_device() -> str: | |
"""Determine the best available device for computation""" | |
if torch.cuda.is_available(): | |
return "cuda" | |
elif torch.backends.mps.is_available(): | |
return "mps" | |
else: | |
return "cpu" | |
def initialize_model(self) -> bool: | |
"""Initialize the CLIP interrogator model""" | |
if self.is_initialized: | |
return True | |
try: | |
# Configuración para CPU inicialmente | |
config = Config( | |
clip_model_name="ViT-L-14/openai", | |
download_cache=True, | |
chunk_size=2048, | |
quiet=True, | |
device="cpu" # Siempre inicializar en CPU | |
) | |
self.interrogator = Interrogator(config) | |
self.is_initialized = True | |
# Clean up memory after initialization | |
gc.collect() | |
logger.info("Model initialized successfully on CPU") | |
return True | |
except Exception as e: | |
logger.error(f"Initialization error: {e}") | |
return False | |
def optimize_image(self, image: Any) -> Optional[Image.Image]: | |
"""Optimize image for processing""" | |
if image is None: | |
return None | |
try: | |
# Convert to PIL Image if necessary | |
if isinstance(image, np.ndarray): | |
image = Image.fromarray(image) | |
elif not isinstance(image, Image.Image): | |
image = Image.open(image) | |
# Convert to RGB if necessary | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Resize if too large | |
max_size = 768 # Reducir tamaño para evitar problemas de memoria | |
if image.size[0] > max_size or image.size[1] > max_size: | |
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
return image | |
except Exception as e: | |
logger.error(f"Image optimization error: {e}") | |
return None | |
def apply_flux_rules(self, base_prompt: str) -> str: | |
"""Aplica las reglas de Flux a un prompt base de CLIP Interrogator""" | |
# Limpiar el prompt de elementos no deseados | |
cleanup_patterns = [ | |
r',\s*trending on artstation', | |
r',\s*trending on [^,]+', | |
r',\s*\d+k\s*', | |
r',\s*\d+k resolution', | |
r',\s*artstation', | |
r',\s*concept art', | |
r',\s*digital art', | |
r',\s*by greg rutkowski', | |
] | |
cleaned_prompt = base_prompt | |
for pattern in cleanup_patterns: | |
cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) | |
# Detectar el tipo de imagen para añadir configuración de cámara apropiada | |
camera_config = "" | |
if any(word in base_prompt.lower() for word in ['portrait', 'person', 'man', 'woman', 'face']): | |
camera_config = ", Shot on Hasselblad X2D 100C, 90mm f/2.5 lens at f/2.8, professional portrait photography" | |
elif any(word in base_prompt.lower() for word in ['landscape', 'mountain', 'nature', 'outdoor']): | |
camera_config = ", Shot on Phase One XT, 40mm f/4 lens at f/8, epic landscape photography" | |
elif any(word in base_prompt.lower() for word in ['street', 'urban', 'city']): | |
camera_config = ", Shot on Leica M11, 35mm f/1.4 lens at f/2.8, documentary street photography" | |
else: | |
camera_config = ", Shot on Phase One XF IQ4, 80mm f/2.8 lens at f/4, professional photography" | |
# Añadir mejoras de iluminación si no están presentes | |
if 'lighting' not in cleaned_prompt.lower(): | |
if 'dramatic' in cleaned_prompt.lower(): | |
cleaned_prompt += ", dramatic cinematic lighting" | |
elif 'portrait' in cleaned_prompt.lower(): | |
cleaned_prompt += ", professional studio lighting with subtle rim light" | |
else: | |
cleaned_prompt += ", masterful natural lighting" | |
# Construir el prompt final | |
final_prompt = cleaned_prompt + camera_config | |
# Asegurar que empiece con mayúscula | |
final_prompt = final_prompt[0].upper() + final_prompt[1:] if final_prompt else final_prompt | |
# Limpiar espacios y comas duplicadas | |
final_prompt = re.sub(r'\s+', ' ', final_prompt) | |
final_prompt = re.sub(r',\s*,+', ',', final_prompt) | |
return final_prompt | |
def _prepare_models_for_gpu(self): | |
"""Prepara los modelos para GPU con la precisión correcta""" | |
try: | |
if hasattr(self.interrogator, 'caption_model'): | |
self.interrogator.caption_model = self.interrogator.caption_model.half().to("cuda") | |
if hasattr(self.interrogator, 'clip_model'): | |
self.interrogator.clip_model = self.interrogator.clip_model.half().to("cuda") | |
if hasattr(self.interrogator, 'blip_model'): | |
self.interrogator.blip_model = self.interrogator.blip_model.half().to("cuda") | |
self.interrogator.config.device = "cuda" | |
logger.info("Models prepared for GPU with FP16") | |
except Exception as e: | |
logger.error(f"Error preparing models for GPU: {e}") | |
raise | |
def _prepare_models_for_cpu(self): | |
"""Prepara los modelos para CPU con float32""" | |
try: | |
if hasattr(self.interrogator, 'caption_model'): | |
self.interrogator.caption_model = self.interrogator.caption_model.float().to("cpu") | |
if hasattr(self.interrogator, 'clip_model'): | |
self.interrogator.clip_model = self.interrogator.clip_model.float().to("cpu") | |
if hasattr(self.interrogator, 'blip_model'): | |
self.interrogator.blip_model = self.interrogator.blip_model.float().to("cpu") | |
self.interrogator.config.device = "cpu" | |
logger.info("Models prepared for CPU with FP32") | |
except Exception as e: | |
logger.error(f"Error preparing models for CPU: {e}") | |
raise | |
def run_clip_inference(self, image: Image.Image) -> Tuple[str, str, str]: | |
"""Solo la inferencia CLIP usa GPU""" | |
try: | |
# Preparar modelos para GPU | |
self._prepare_models_for_gpu() | |
# Usar autocast para manejar precisión mixta | |
with torch.cuda.amp.autocast(enabled=True, dtype=torch.float16): | |
# Convertir imagen a tensor y asegurar que esté en half precision | |
from torchvision import transforms | |
preprocess = transforms.Compose([ | |
transforms.Resize((224, 224)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.48145466, 0.4578275, 0.40821073], | |
std=[0.26862954, 0.26130258, 0.27577711]), | |
]) | |
# Procesar imagen manualmente para controlar la precisión | |
image_tensor = preprocess(image).unsqueeze(0).half().to("cuda") | |
# Ejecutar inferencias con manejo especial | |
full_prompt = self._safe_interrogate(image, 'interrogate') | |
clip_fast = self._safe_interrogate(image, 'interrogate_fast') | |
clip_classic = self._safe_interrogate(image, 'interrogate_classic') | |
return full_prompt, clip_fast, clip_classic | |
except Exception as e: | |
logger.error(f"GPU inference error: {e}") | |
# Intentar en CPU como fallback | |
return self._run_cpu_inference(image) | |
def _safe_interrogate(self, image: Image.Image, method: str) -> str: | |
"""Ejecuta interrogate de forma segura manejando precisión""" | |
try: | |
# Temporalmente parchear el método de procesamiento de imagen | |
original_method = getattr(self.interrogator, method) | |
# Ejecutar el método | |
result = original_method(image) | |
return result | |
except Exception as e: | |
logger.error(f"Error in {method}: {e}") | |
return f"Error processing with {method}" | |
def _run_cpu_inference(self, image: Image.Image) -> Tuple[str, str, str]: | |
"""Ejecuta inferencia en CPU como fallback""" | |
try: | |
logger.info("Running CPU inference as fallback") | |
# Preparar modelos para CPU | |
self._prepare_models_for_cpu() | |
# Ejecutar en CPU sin autocast | |
full_prompt = self.interrogator.interrogate(image) | |
clip_fast = self.interrogator.interrogate_fast(image) | |
clip_classic = self.interrogator.interrogate_classic(image) | |
return full_prompt, clip_fast, clip_classic | |
except Exception as e: | |
logger.error(f"CPU inference also failed: {e}") | |
return "Error: Failed to process image", "Error", "Error" | |
def generate_ultra_supreme_prompt(self, image: Any) -> Tuple[str, str, int, Dict[str, int]]: | |
""" | |
Generate ultra supreme prompt from image usando el pipeline completo | |
Returns: | |
Tuple of (prompt, analysis_info, score, breakdown) | |
""" | |
try: | |
# Inicializar modelo si no está inicializado | |
if not self.is_initialized: | |
if not self.initialize_model(): | |
return "❌ Model initialization failed.", "Please refresh and try again.", 0, {} | |
# Validate input | |
if image is None: | |
return "❌ Please upload an image.", "No image provided.", 0, {} | |
self.usage_count += 1 | |
# Optimize image | |
image = self.optimize_image(image) | |
if image is None: | |
return "❌ Image processing failed.", "Invalid image format.", 0, {} | |
start_time = datetime.now() | |
logger.info("ULTRA SUPREME ANALYSIS - Starting complete pipeline with multi-model analysis") | |
# Ejecutar inferencia CLIP | |
full_prompt, clip_fast, clip_classic = self.run_clip_inference(image) | |
# Verificar si hubo errores | |
if "Error" in full_prompt: | |
logger.warning("Using fallback prompt due to inference error") | |
full_prompt = "A photograph" | |
clip_fast = "image" | |
clip_classic = "picture" | |
logger.info(f"CLIP complete prompt: {full_prompt[:100]}...") | |
# NUEVO: Ejecutar análisis ultra supremo con múltiples modelos | |
logger.info("Running multi-model ultra supreme analysis...") | |
ultra_analysis = self.analyzer.ultra_supreme_analysis( | |
image, clip_fast, clip_classic, full_prompt | |
) | |
# Construir prompt mejorado basado en análisis completo | |
enhanced_prompt_parts = [] | |
# Base prompt de CLIP | |
enhanced_prompt_parts.append(full_prompt) | |
# Agregar información demográfica si está disponible | |
if ultra_analysis["demographic"]["gender"] and ultra_analysis["demographic"]["gender_confidence"] > 0.7: | |
gender = ultra_analysis["demographic"]["gender"] | |
age_cat = ultra_analysis["demographic"]["age_category"] | |
if age_cat: | |
enhanced_prompt_parts.append(f"{age_cat} {gender}") | |
# Agregar estado emocional principal | |
if ultra_analysis["emotional_state"]["primary_emotion"] and ultra_analysis["emotional_state"]["emotion_confidence"] > 0.6: | |
emotion = ultra_analysis["emotional_state"]["primary_emotion"] | |
enhanced_prompt_parts.append(f"{emotion} expression") | |
# Agregar información de pose si está disponible | |
if ultra_analysis["pose_composition"]["posture"]: | |
enhanced_prompt_parts.append(ultra_analysis["pose_composition"]["posture"][0]) | |
# Combinar y aplicar reglas de Flux | |
combined_prompt = ", ".join(enhanced_prompt_parts) | |
optimized_prompt = self.apply_flux_rules(combined_prompt) | |
# Si el analyzer enriqueció el prompt, úsalo | |
analyzer_prompt = self.analyzer.build_ultra_supreme_prompt(ultra_analysis, [full_prompt]) | |
if len(analyzer_prompt) > len(optimized_prompt): | |
optimized_prompt = self.apply_flux_rules(analyzer_prompt) | |
# Calcular score usando el analyzer | |
score, breakdown = self.analyzer.calculate_ultra_supreme_score(optimized_prompt, ultra_analysis) | |
end_time = datetime.now() | |
duration = (end_time - start_time).total_seconds() | |
# Memory cleanup | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
# Generate enhanced analysis report con datos de múltiples modelos | |
analysis_info = self._generate_ultra_analysis_report( | |
ultra_analysis, score, breakdown, duration | |
) | |
return optimized_prompt, analysis_info, score, breakdown | |
except Exception as e: | |
logger.error(f"Ultra supreme generation error: {e}", exc_info=True) | |
return f"❌ Error: {str(e)}", "Please try with a different image.", 0, {} | |
def _detect_style(self, prompt: str) -> str: | |
"""Detecta el estilo principal del prompt""" | |
styles = { | |
"portrait": ["portrait", "person", "face", "headshot"], | |
"landscape": ["landscape", "mountain", "nature", "scenery"], | |
"street": ["street", "urban", "city"], | |
"artistic": ["artistic", "abstract", "conceptual"], | |
"dramatic": ["dramatic", "cinematic", "moody"] | |
} | |
prompt_lower = prompt.lower() | |
for style_name, keywords in styles.items(): | |
if any(keyword in prompt_lower for keyword in keywords): | |
return style_name | |
return "general" | |
def _detect_subject(self, prompt: str) -> str: | |
"""Detecta el sujeto principal del prompt""" | |
if not prompt: | |
return "Unknown" | |
# Tomar las primeras palabras significativas | |
words = prompt.split(',')[0].split() | |
if len(words) > 3: | |
return ' '.join(words[:4]) | |
return prompt.split(',')[0] if prompt else "Unknown" | |
def _calculate_score(self, optimized_prompt: str, base_prompt: str) -> int: | |
"""Calcula el score basado en la calidad del prompt""" | |
score = 0 | |
# Base score por longitud y riqueza | |
score += min(len(base_prompt) // 10, 25) | |
# Technical enhancement | |
if "Shot on" in optimized_prompt: | |
score += 25 | |
# Lighting quality | |
if "lighting" in optimized_prompt.lower(): | |
score += 25 | |
# Professional quality | |
if any(word in optimized_prompt.lower() for word in ["professional", "masterful", "epic", "cinematic"]): | |
score += 25 | |
return min(score, 100) | |
def _generate_ultra_analysis_report(self, analysis: Dict[str, Any], | |
score: int, breakdown: Dict[str, int], | |
duration: float) -> str: | |
"""Generate ultra detailed analysis report with multi-model results""" | |
device_used = "cuda" if torch.cuda.is_available() else "cpu" | |
gpu_status = "⚡ ZeroGPU" if device_used == "cuda" else "💻 CPU" | |
# Demographic info | |
demo_info = "" | |
if analysis["demographic"]["age_category"]: | |
age = analysis["demographic"]["age_category"].replace("_", " ").title() | |
gender = analysis["demographic"]["gender"] or "person" | |
confidence = analysis["demographic"]["age_confidence"] | |
demo_info = f"**Detected:** {age} {gender} (confidence: {confidence:.0%})" | |
# Emotion info | |
emotion_info = "" | |
if analysis["emotional_state"]["primary_emotion"]: | |
emotion = analysis["emotional_state"]["primary_emotion"] | |
confidence = analysis["emotional_state"]["emotion_confidence"] | |
emotion_info = f"**Primary Emotion:** {emotion} ({confidence:.0%})" | |
# Add emotion distribution if available | |
if analysis["emotional_state"]["emotion_distribution"]: | |
top_emotions = sorted( | |
analysis["emotional_state"]["emotion_distribution"].items(), | |
key=lambda x: x[1], reverse=True | |
)[:3] | |
emotion_details = ", ".join([f"{e[0]}: {e[1]:.0%}" for e in top_emotions]) | |
emotion_info += f"\n**Emotion Distribution:** {emotion_details}" | |
# Face analysis info | |
face_info = f"**Faces Detected:** {analysis['facial_ultra']['face_count']}" | |
if analysis['facial_ultra']['face_count'] > 0: | |
features = [] | |
for feature_type in ['eyes', 'mouth', 'facial_hair', 'skin']: | |
if analysis['facial_ultra'].get(feature_type): | |
features.extend(analysis['facial_ultra'][feature_type]) | |
if features: | |
face_info += f"\n**Facial Features:** {', '.join(features[:5])}" | |
# Pose info | |
pose_info = "" | |
if analysis["pose_composition"].get("pose_confidence", 0) > 0: | |
confidence = analysis["pose_composition"]["pose_confidence"] | |
pose_info = f"**Pose Analysis:** Body detected ({confidence:.0%} confidence)" | |
if analysis["pose_composition"]["posture"]: | |
pose_info += f"\n**Posture:** {', '.join(analysis['pose_composition']['posture'])}" | |
# Environment info | |
env_info = "" | |
if analysis["environmental"]["setting_type"]: | |
env_info = f"**Setting:** {analysis['environmental']['setting_type'].replace('_', ' ').title()}" | |
if analysis["environmental"]["lighting_analysis"]: | |
env_info += f"\n**Lighting:** {', '.join(analysis['environmental']['lighting_analysis'])}" | |
# Intelligence metrics | |
metrics = analysis["intelligence_metrics"] | |
analysis_info = f"""**🚀 ULTRA SUPREME MULTI-MODEL ANALYSIS COMPLETE** | |
**Processing:** {gpu_status} • {duration:.1f}s • Multi-Model Pipeline | |
**Ultra Score:** {score}/100 • Models: CLIP + DeepFace + MediaPipe + Transformers | |
**📊 BREAKDOWN:** | |
• Prompt Quality: {breakdown.get('prompt_quality', 0)}/25 | |
• Analysis Depth: {breakdown.get('analysis_depth', 0)}/25 | |
• Model Confidence: {breakdown.get('model_confidence', 0)}/25 | |
• Feature Richness: {breakdown.get('feature_richness', 0)}/25 | |
**🧠 DEEP ANALYSIS RESULTS:** | |
**👤 DEMOGRAPHICS & IDENTITY:** | |
{demo_info or "No face detected for demographic analysis"} | |
**😊 EMOTIONAL ANALYSIS:** | |
{emotion_info or "No emotional data available"} | |
**👁️ FACIAL ANALYSIS:** | |
{face_info} | |
**🚶 POSE & BODY LANGUAGE:** | |
{pose_info or "No pose data available"} | |
**🏞️ ENVIRONMENT & SCENE:** | |
{env_info or "No environmental data detected"} | |
**📊 INTELLIGENCE METRICS:** | |
• **Total Features Detected:** {metrics['total_features_detected']} | |
• **Analysis Depth Score:** {metrics['analysis_depth_score']}/100 | |
• **Model Confidence Average:** {metrics['model_confidence_average']:.0%} | |
• **Technical Optimization:** {metrics['technical_optimization_score']}/100 | |
**✨ MULTI-MODEL ADVANTAGES:** | |
✅ DeepFace: Accurate age, gender, emotion detection | |
✅ MediaPipe: Body pose and gesture analysis | |
✅ CLIP: Semantic understanding and context | |
✅ Transformers: Advanced emotion classification | |
✅ OpenCV: Robust face detection | |
**🔬 Powered by Pariente AI Research • Ultra Supreme Intelligence Engine**""" | |
return analysis_info |