""" Ultra Supreme Optimizer - Main optimization engine for image analysis VERSIÓN MEJORADA - Usa el prompt completo de CLIP Interrogator """ # IMPORTANT: spaces must be imported BEFORE torch or any CUDA-using library import spaces import gc import logging from datetime import datetime from typing import Tuple, Dict, Any, Optional import torch import numpy as np from PIL import Image from clip_interrogator import Config, Interrogator from analyzer import UltraSupremeAnalyzer logger = logging.getLogger(__name__) class UltraSupremeOptimizer: """Main optimizer class for ultra supreme image analysis""" def __init__(self): self.interrogator: Optional[Interrogator] = None self.analyzer = UltraSupremeAnalyzer() self.usage_count = 0 self.device = self._get_device() self.is_initialized = False @staticmethod def _get_device() -> str: """Determine the best available device for computation""" if torch.cuda.is_available(): return "cuda" elif torch.backends.mps.is_available(): return "mps" else: return "cpu" def initialize_model(self) -> bool: """Initialize the CLIP interrogator model""" if self.is_initialized: return True try: config = Config( clip_model_name="ViT-L-14/openai", download_cache=True, chunk_size=2048, quiet=True, device=self.device ) self.interrogator = Interrogator(config) self.is_initialized = True # Clean up memory after initialization if self.device == "cpu": gc.collect() else: torch.cuda.empty_cache() return True except Exception as e: logger.error(f"Initialization error: {e}") return False def optimize_image(self, image: Any) -> Optional[Image.Image]: """Optimize image for processing""" if image is None: return None try: # Convert to PIL Image if necessary if isinstance(image, np.ndarray): image = Image.fromarray(image) elif not isinstance(image, Image.Image): image = Image.open(image) # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Resize if too large max_size = 768 if self.device != "cpu" else 512 if image.size[0] > max_size or image.size[1] > max_size: image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) return image except Exception as e: logger.error(f"Image optimization error: {e}") return None def apply_flux_rules(self, base_prompt: str) -> str: """Aplica las reglas de Flux a un prompt base de CLIP Interrogator""" # Limpiar el prompt de elementos no deseados cleanup_patterns = [ r',\s*trending on artstation', r',\s*trending on [^,]+', r',\s*\d+k\s*', r',\s*\d+k resolution', r',\s*artstation', r',\s*concept art', r',\s*digital art', r',\s*by greg rutkowski', # Remover artistas genéricos overused ] cleaned_prompt = base_prompt for pattern in cleanup_patterns: cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) # Detectar el tipo de imagen para añadir configuración de cámara apropiada camera_config = "" if any(word in base_prompt.lower() for word in ['portrait', 'person', 'man', 'woman', 'face']): camera_config = ", Shot on Hasselblad X2D 100C, 90mm f/2.5 lens at f/2.8, professional portrait photography" elif any(word in base_prompt.lower() for word in ['landscape', 'mountain', 'nature', 'outdoor']): camera_config = ", Shot on Phase One XT, 40mm f/4 lens at f/8, epic landscape photography" elif any(word in base_prompt.lower() for word in ['street', 'urban', 'city']): camera_config = ", Shot on Leica M11, 35mm f/1.4 lens at f/2.8, documentary street photography" else: camera_config = ", Shot on Phase One XF IQ4, 80mm f/2.8 lens at f/4, professional photography" # Añadir mejoras de iluminación si no están presentes if 'lighting' not in cleaned_prompt.lower(): if 'dramatic' in cleaned_prompt.lower(): cleaned_prompt += ", dramatic cinematic lighting" elif 'portrait' in cleaned_prompt.lower(): cleaned_prompt += ", professional studio lighting with subtle rim light" else: cleaned_prompt += ", masterful natural lighting" # Construir el prompt final final_prompt = cleaned_prompt + camera_config # Asegurar que empiece con mayúscula final_prompt = final_prompt[0].upper() + final_prompt[1:] if final_prompt else final_prompt # Limpiar espacios y comas duplicadas final_prompt = re.sub(r'\s+', ' ', final_prompt) final_prompt = re.sub(r',\s*,+', ',', final_prompt) return final_prompt @spaces.GPU def generate_ultra_supreme_prompt(self, image: Any) -> Tuple[str, str, int, Dict[str, int]]: """ Generate ultra supreme prompt from image usando el pipeline completo Returns: Tuple of (prompt, analysis_info, score, breakdown) """ try: # Initialize model if needed if not self.is_initialized: if not self.initialize_model(): return "❌ Model initialization failed.", "Please refresh and try again.", 0, {} # Validate input if image is None: return "❌ Please upload an image.", "No image provided.", 0, {} self.usage_count += 1 # Optimize image image = self.optimize_image(image) if image is None: return "❌ Image processing failed.", "Invalid image format.", 0, {} start_time = datetime.now() # NUEVO PIPELINE: Usar CLIP Interrogator completo logger.info("ULTRA SUPREME ANALYSIS - Usando pipeline completo de CLIP Interrogator") # 1. Obtener el prompt COMPLETO de CLIP Interrogator (no solo análisis) # Este incluye descripción + artistas + estilos + mediums full_prompt = self.interrogator.interrogate(image) logger.info(f"Prompt completo de CLIP Interrogator: {full_prompt}") # 2. También obtener los análisis individuales para el reporte clip_fast = self.interrogator.interrogate_fast(image) clip_classic = self.interrogator.interrogate_classic(image) logger.info(f"Análisis Fast: {clip_fast}") logger.info(f"Análisis Classic: {clip_classic}") # 3. Aplicar reglas de Flux al prompt completo import re optimized_prompt = self.apply_flux_rules(full_prompt) # 4. Crear análisis para el reporte (simplificado) analysis_summary = { "base_prompt": full_prompt, "clip_fast": clip_fast, "clip_classic": clip_classic, "optimized": optimized_prompt, "detected_style": self._detect_style(full_prompt), "detected_subject": self._detect_subject(full_prompt) } # 5. Calcular score basado en la riqueza del prompt score = self._calculate_score(optimized_prompt, full_prompt) breakdown = { "base_quality": min(len(full_prompt) // 10, 25), "technical_enhancement": 25 if "Shot on" in optimized_prompt else 0, "lighting_quality": 25 if "lighting" in optimized_prompt.lower() else 0, "composition": 25 if any(word in optimized_prompt.lower() for word in ["professional", "masterful", "epic"]) else 0 } score = sum(breakdown.values()) end_time = datetime.now() duration = (end_time - start_time).total_seconds() # Memory cleanup if self.device == "cpu": gc.collect() else: torch.cuda.empty_cache() # Generate analysis report analysis_info = self._generate_analysis_report( analysis_summary, score, breakdown, duration ) return optimized_prompt, analysis_info, score, breakdown except Exception as e: logger.error(f"Ultra supreme generation error: {e}") return f"❌ Error: {str(e)}", "Please try with a different image.", 0, {} def _detect_style(self, prompt: str) -> str: """Detecta el estilo principal del prompt""" styles = { "portrait": ["portrait", "person", "face", "headshot"], "landscape": ["landscape", "mountain", "nature", "scenery"], "street": ["street", "urban", "city"], "artistic": ["artistic", "abstract", "conceptual"], "dramatic": ["dramatic", "cinematic", "moody"] } for style_name, keywords in styles.items(): if any(keyword in prompt.lower() for keyword in keywords): return style_name return "general" def _detect_subject(self, prompt: str) -> str: """Detecta el sujeto principal del prompt""" # Tomar las primeras palabras significativas words = prompt.split(',')[0].split() if len(words) > 3: return ' '.join(words[:4]) return prompt.split(',')[0] def _calculate_score(self, optimized_prompt: str, base_prompt: str) -> int: """Calcula el score basado en la calidad del prompt""" score = 0 # Base score por longitud y riqueza score += min(len(base_prompt) // 10, 25) # Technical enhancement if "Shot on" in optimized_prompt: score += 25 # Lighting quality if "lighting" in optimized_prompt.lower(): score += 25 # Professional quality if any(word in optimized_prompt.lower() for word in ["professional", "masterful", "epic", "cinematic"]): score += 25 return min(score, 100) def _generate_analysis_report(self, analysis: Dict[str, Any], score: int, breakdown: Dict[str, int], duration: float) -> str: """Generate detailed analysis report""" gpu_status = "⚡ ZeroGPU" if torch.cuda.is_available() else "💻 CPU" # Extraer información clave detected_style = analysis.get("detected_style", "general").title() detected_subject = analysis.get("detected_subject", "Unknown") base_prompt_preview = analysis.get("base_prompt", "")[:100] + "..." if len(analysis.get("base_prompt", "")) > 100 else analysis.get("base_prompt", "") analysis_info = f"""**🚀 ULTRA SUPREME ANALYSIS COMPLETE** **Processing:** {gpu_status} • {duration:.1f}s • Full CLIP Interrogator Pipeline **Ultra Score:** {score}/100 • Breakdown: Base({breakdown.get('base_quality',0)}) Technical({breakdown.get('technical_enhancement',0)}) Lighting({breakdown.get('lighting_quality',0)}) Composition({breakdown.get('composition',0)}) **Generation:** #{self.usage_count} **🧠 INTELLIGENT DETECTION:** - **Detected Style:** {detected_style} - **Main Subject:** {detected_subject} - **Pipeline:** CLIP Interrogator → Flux Optimization → Technical Enhancement **📊 CLIP INTERROGATOR ANALYSIS:** - **Base Prompt:** {base_prompt_preview} - **Fast Analysis:** {analysis.get('clip_fast', '')[:80]}... - **Classic Analysis:** {analysis.get('clip_classic', '')[:80]}... **⚡ OPTIMIZATION APPLIED:** - ✅ Preserved CLIP Interrogator's rich description - ✅ Added professional camera specifications - ✅ Enhanced lighting descriptions - ✅ Applied Flux-specific optimizations - ✅ Removed redundant/generic elements **🔬 Powered by Pariente AI Research + CLIP Interrogator**""" return analysis_info