Phramer_AI / models.py
Malaji71's picture
Update models.py
6ee37e6 verified
raw
history blame
18.2 kB
"""
Model management for Phramer AI
By Pariente AI, for MIA TV Series
BAGEL 7B integration with professional photography knowledge enhancement
"""
import spaces
import logging
import tempfile
import os
import re
from typing import Optional, Dict, Any, Tuple
from PIL import Image
from gradio_client import Client, handle_file
from config import get_device_config, PROFESSIONAL_PHOTOGRAPHY_CONFIG
from utils import clean_memory, safe_execute
from professional_photography import (
ProfessionalPhotoAnalyzer,
enhance_flux_prompt_with_professional_knowledge,
professional_analyzer,
export_professional_prompt_enhancement
)
logger = logging.getLogger(__name__)
class BaseImageAnalyzer:
"""Base class for image analysis models"""
def __init__(self):
self.is_initialized = False
self.device_config = get_device_config()
def initialize(self) -> bool:
"""Initialize the model"""
raise NotImplementedError
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image and return description"""
raise NotImplementedError
def cleanup(self) -> None:
"""Clean up model resources"""
clean_memory()
class BagelAPIAnalyzer(BaseImageAnalyzer):
"""BAGEL 7B model with professional photography knowledge integration"""
def __init__(self):
super().__init__()
self.client = None
self.space_url = "Malaji71/Bagel-7B-Demo"
self.api_endpoint = "/image_understanding"
self.hf_token = os.getenv("HF_TOKEN")
self.professional_analyzer = professional_analyzer
def initialize(self) -> bool:
"""Initialize BAGEL API client with authentication"""
if self.is_initialized:
return True
try:
logger.info("Initializing BAGEL API client for Phramer AI...")
# Initialize client with token if available
if self.hf_token:
logger.info("Using HF token for enhanced API access")
self.client = Client(self.space_url, hf_token=self.hf_token)
else:
logger.info("Using public API access")
self.client = Client(self.space_url)
self.is_initialized = True
logger.info("BAGEL API client initialized successfully")
return True
except Exception as e:
logger.error(f"BAGEL API client initialization failed: {e}")
if self.hf_token:
logger.info("Retrying without token...")
try:
self.client = Client(self.space_url)
self.is_initialized = True
logger.info("BAGEL API client initialized (fallback mode)")
return True
except Exception as e2:
logger.error(f"Fallback initialization failed: {e2}")
return False
def _get_professional_prompt(self, analysis_type: str = "multimodal") -> str:
"""Get professional prompt that teaches BAGEL to use the complete knowledge base"""
try:
# Import the complete knowledge base
from professional_photography import EXPERT_PHOTOGRAPHY_KNOWLEDGE
# Create the teaching prompt with the complete structure
prompt = f"""Analiza esta imagen con conocimiento cinematográfico profesional completo. Estructura tu respuesta así:
GRAMÁTICA: [PLANO] of [SUJETO] [ACCIÓN] [CONTEXTO], [ILUMINACIÓN], [COMPOSICIÓN], shot on [CÁMARA], [LENTE], [CONFIGURACIÓN]
BUSCA ESTOS CAMPOS:
PLANO: Consulta photographic_planes completo:
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('photographic_planes', {})}
- Encuentra el plano que coincida exactamente
SUJETO + ACCIÓN: Describe exactamente lo que ves - persona, objeto, qué está haciendo
CONTEXTO: Describe el ambiente/entorno completamente - ubicación, elementos del fondo
ILUMINACIÓN: Consulta lighting_principles completo:
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('lighting_principles', {})}
- Encuentra el tipo de iluminación que coincida
COMPOSICIÓN: Consulta composition_rules completo:
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('composition_rules', {})}
- Encuentra la técnica de composición que se aplica
CONFIGURACIÓN TÉCNICA: Consulta scene_types completo:
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('scene_types', {})}
- Encuentra el setup de cámara apropiado para el tipo de escena
USA TODA LA BASE DE DATOS EXPERT_PHOTOGRAPHY_KNOWLEDGE COMPLETA.
Completa cada campo con la información específica que encuentres. Responde en el formato de gramática especificado."""
return prompt
except Exception as e:
logger.warning(f"Professional knowledge base access failed: {e}")
return """Analiza esta imagen con conocimiento cinematográfico profesional. Estructura tu respuesta así:
GRAMÁTICA: [PLANO] of [SUJETO] [ACCIÓN] [CONTEXTO], [ILUMINACIÓN], [COMPOSICIÓN], shot on [CÁMARA], [LENTE], [CONFIGURACIÓN]
BUSCA Y COMPLETA CADA CAMPO:
- PLANO: wide shot, medium shot, close-up, etc.
- SUJETO + ACCIÓN: exactamente lo que ves
- CONTEXTO: ambiente y entorno
- ILUMINACIÓN: tipo de luz
- COMPOSICIÓN: técnica utilizada
- CONFIGURACIÓN TÉCNICA: cámara y lente apropiados
Usa tu conocimiento profesional completo para completar cada campo."""
def _save_temp_image(self, image: Image.Image) -> str:
"""Save image to temporary file for API call"""
try:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
temp_path = temp_file.name
temp_file.close()
if image.mode != 'RGB':
image = image.convert('RGB')
image.save(temp_path, 'PNG')
return temp_path
except Exception as e:
logger.error(f"Failed to save temporary image: {e}")
return None
def _cleanup_temp_file(self, file_path: str):
"""Clean up temporary file"""
try:
if file_path and os.path.exists(file_path):
os.unlink(file_path)
except Exception as e:
logger.warning(f"Failed to cleanup temp file: {e}")
@spaces.GPU(duration=60)
def analyze_image(self, image: Image.Image, prompt: str = None) -> Tuple[str, Dict[str, Any]]:
"""Analyze image using BAGEL API with professional cinematography enhancement"""
if not self.is_initialized:
success = self.initialize()
if not success:
return "BAGEL API not available", {"error": "API initialization failed"}
temp_path = None
metadata = {
"model": "BAGEL-7B-Professional",
"device": "api",
"confidence": 0.9,
"api_endpoint": self.api_endpoint,
"space_url": self.space_url,
"prompt_used": prompt,
"has_camera_suggestion": False,
"professional_enhancement": True
}
try:
# Use professional prompt created by professional_photography.py
if prompt is None:
prompt = self._get_professional_prompt("multimodal")
# Save image to temporary file
temp_path = self._save_temp_image(image)
if not temp_path:
return "Image processing failed", {"error": "Could not save image"}
logger.info("Calling BAGEL API with professional_photography.py prompt...")
# Call BAGEL API with professional prompt - let it do its work
result = self.client.predict(
image=handle_file(temp_path),
prompt=prompt,
show_thinking=False,
do_sample=True, # Allow creativity
text_temperature=0.7, # Higher temperature for richer descriptions
max_new_tokens=1024, # More tokens for detailed analysis
api_name=self.api_endpoint
)
# Extract response without filtering
if isinstance(result, tuple) and len(result) >= 2:
description = result[1] if result[1] else result[0]
else:
description = str(result)
if isinstance(description, str) and description.strip():
description = description.strip()
# Extract camera setup if present
if "CAMERA_SETUP:" in description or "2. CAMERA_SETUP" in description:
metadata["has_camera_suggestion"] = True
logger.info("BAGEL provided camera setup recommendation")
else:
metadata["has_camera_suggestion"] = False
# Mark as cinematography enhanced
metadata["cinematography_context_applied"] = True
else:
description = "Professional cinematographic analysis completed"
metadata["has_camera_suggestion"] = False
# Update metadata
metadata.update({
"response_length": len(description),
"analysis_type": "professional_enhanced"
})
logger.info(f"BAGEL Professional analysis complete: {len(description)} chars")
return description, metadata
except Exception as e:
logger.error(f"BAGEL Professional analysis failed: {e}")
return "Professional analysis failed", {"error": str(e), "model": "BAGEL-7B-Professional"}
finally:
if temp_path:
self._cleanup_temp_file(temp_path)
def analyze_for_cinematic_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image specifically for cinematic/MIA TV Series prompt generation"""
cinematic_prompt = self._get_professional_prompt("cinematic")
return self.analyze_image(image, cinematic_prompt)
def analyze_for_flux_with_professional_context(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image for FLUX with enhanced professional cinematography context"""
flux_prompt = self._get_professional_prompt("flux_optimized")
return self.analyze_image(image, flux_prompt)
def analyze_for_multiengine_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image for multi-engine compatibility (Flux, Midjourney, etc.)"""
multiengine_prompt = self._get_professional_prompt("multimodal")
return self.analyze_image(image, multiengine_prompt)
def cleanup(self) -> None:
"""Clean up API client resources"""
try:
if hasattr(self, 'client'):
self.client = None
super().cleanup()
logger.info("BAGEL Professional API resources cleaned up")
except Exception as e:
logger.warning(f"BAGEL Professional API cleanup warning: {e}")
class FallbackAnalyzer(BaseImageAnalyzer):
"""Enhanced fallback analyzer using professional_photography.py knowledge"""
def __init__(self):
super().__init__()
self.professional_analyzer = professional_analyzer
def initialize(self) -> bool:
"""Fallback with cinematography enhancement is always ready"""
self.is_initialized = True
return True
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Provide enhanced image description using professional_photography.py"""
try:
width, height = image.size
aspect_ratio = width / height
# Use professional_photography.py to analyze basic image properties
basic_description = f"Professional photograph with {width}x{height} resolution, aspect ratio {aspect_ratio:.2f}"
# Let professional_photography.py enhance this
try:
enhancement_result = export_professional_prompt_enhancement(
bagel_output=basic_description,
bagel_metadata={"image_size": f"{width}x{height}", "aspect_ratio": aspect_ratio}
)
description = enhancement_result["enhanced_prompt"]
camera_setup = enhancement_result["metadata"].get("technical_context", "")
except Exception as e:
logger.warning(f"Professional enhancement failed in fallback: {e}")
# Basic fallback
if aspect_ratio > 1.5:
description = "Wide shot composition with natural lighting and balanced framing"
camera_setup = "shot on Phase One XT, 24-70mm f/4 lens, ISO 100"
elif aspect_ratio < 0.75:
description = "Portrait composition with professional lighting and sharp focus"
camera_setup = "shot on Canon EOS R5, 85mm f/1.4 lens, ISO 200"
else:
description = "Balanced composition with professional execution"
camera_setup = "shot on Canon EOS R6, 50mm f/1.8 lens, ISO 400"
metadata = {
"model": "Professional-Fallback",
"device": "cpu",
"confidence": 0.7,
"image_size": f"{width}x{height}",
"aspect_ratio": round(aspect_ratio, 2),
"has_camera_suggestion": True,
"camera_setup": camera_setup,
"professional_enhancement": True,
"cinematography_context_applied": True
}
return description, metadata
except Exception as e:
logger.error(f"Professional fallback analysis failed: {e}")
return "Professional cinematographic analysis", {
"error": str(e),
"model": "Professional-Fallback"
}
class ModelManager:
"""Enhanced manager for handling image analysis models with professional cinematography integration"""
def __init__(self, preferred_model: str = "bagel-professional"):
self.preferred_model = preferred_model
self.analyzers = {}
self.current_analyzer = None
def get_analyzer(self, model_name: str = None) -> Optional[BaseImageAnalyzer]:
"""Get or create analyzer for specified model"""
model_name = model_name or self.preferred_model
if model_name not in self.analyzers:
if model_name in ["bagel-api", "bagel-professional"]:
self.analyzers[model_name] = BagelAPIAnalyzer()
elif model_name == "fallback":
self.analyzers[model_name] = FallbackAnalyzer()
else:
logger.warning(f"Unknown model: {model_name}, using professional fallback")
model_name = "fallback"
self.analyzers[model_name] = FallbackAnalyzer()
return self.analyzers[model_name]
def analyze_image(self, image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]:
"""Analyze image with professional cinematography enhancement"""
analyzer = self.get_analyzer(model_name)
if analyzer is None:
return "No analyzer available", {"error": "Model not found"}
# Choose analysis method based on type and analyzer capabilities
if analysis_type == "cinematic" and hasattr(analyzer, 'analyze_for_cinematic_prompt'):
success, result = safe_execute(analyzer.analyze_for_cinematic_prompt, image)
elif analysis_type == "flux" and hasattr(analyzer, 'analyze_for_flux_with_professional_context'):
success, result = safe_execute(analyzer.analyze_for_flux_with_professional_context, image)
elif analysis_type == "multiengine" and hasattr(analyzer, 'analyze_for_multiengine_prompt'):
success, result = safe_execute(analyzer.analyze_for_multiengine_prompt, image)
else:
success, result = safe_execute(analyzer.analyze_image, image)
if success and result[1].get("error") is None:
return result
else:
# Fallback with professional_photography.py
logger.warning(f"Primary model failed, using professional fallback")
fallback_analyzer = self.get_analyzer("fallback")
fallback_success, fallback_result = safe_execute(fallback_analyzer.analyze_image, image)
if fallback_success:
return fallback_result
else:
return "All analyzers failed", {"error": "Complete analysis failure"}
def cleanup_all(self) -> None:
"""Clean up all model resources"""
for analyzer in self.analyzers.values():
analyzer.cleanup()
self.analyzers.clear()
clean_memory()
logger.info("All analyzers cleaned up")
# Global model manager instance
model_manager = ModelManager(preferred_model="bagel-professional")
def analyze_image(image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]:
"""
Enhanced convenience function for professional cinematography analysis
Args:
image: PIL Image to analyze
model_name: Optional model name ("bagel-professional", "fallback")
analysis_type: Type of analysis ("multiengine", "cinematic", "flux")
Returns:
Tuple of (description, metadata) with professional cinematography enhancement
"""
return model_manager.analyze_image(image, model_name, analysis_type)
# Export main components
__all__ = [
"BaseImageAnalyzer",
"BagelAPIAnalyzer",
"FallbackAnalyzer",
"ModelManager",
"model_manager",
"analyze_image"
]