Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Model management for Phramer AI | |
By Pariente AI, for MIA TV Series | |
BAGEL 7B integration with professional photography knowledge enhancement | |
""" | |
import spaces | |
import logging | |
import tempfile | |
import os | |
import re | |
from typing import Optional, Dict, Any, Tuple | |
from PIL import Image | |
from gradio_client import Client, handle_file | |
from config import get_device_config, PROFESSIONAL_PHOTOGRAPHY_CONFIG | |
from utils import clean_memory, safe_execute | |
from professional_photography import ( | |
ProfessionalPhotoAnalyzer, | |
enhance_flux_prompt_with_professional_knowledge, | |
professional_analyzer, | |
export_professional_prompt_enhancement | |
) | |
logger = logging.getLogger(__name__) | |
class BaseImageAnalyzer: | |
"""Base class for image analysis models""" | |
def __init__(self): | |
self.is_initialized = False | |
self.device_config = get_device_config() | |
def initialize(self) -> bool: | |
"""Initialize the model""" | |
raise NotImplementedError | |
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image and return description""" | |
raise NotImplementedError | |
def cleanup(self) -> None: | |
"""Clean up model resources""" | |
clean_memory() | |
class BagelAPIAnalyzer(BaseImageAnalyzer): | |
"""BAGEL 7B model with professional photography knowledge integration""" | |
def __init__(self): | |
super().__init__() | |
self.client = None | |
self.space_url = "Malaji71/Bagel-7B-Demo" | |
self.api_endpoint = "/image_understanding" | |
self.hf_token = os.getenv("HF_TOKEN") | |
self.professional_analyzer = professional_analyzer | |
def initialize(self) -> bool: | |
"""Initialize BAGEL API client with authentication""" | |
if self.is_initialized: | |
return True | |
try: | |
logger.info("Initializing BAGEL API client for Phramer AI...") | |
# Initialize client with token if available | |
if self.hf_token: | |
logger.info("Using HF token for enhanced API access") | |
self.client = Client(self.space_url, hf_token=self.hf_token) | |
else: | |
logger.info("Using public API access") | |
self.client = Client(self.space_url) | |
self.is_initialized = True | |
logger.info("BAGEL API client initialized successfully") | |
return True | |
except Exception as e: | |
logger.error(f"BAGEL API client initialization failed: {e}") | |
if self.hf_token: | |
logger.info("Retrying without token...") | |
try: | |
self.client = Client(self.space_url) | |
self.is_initialized = True | |
logger.info("BAGEL API client initialized (fallback mode)") | |
return True | |
except Exception as e2: | |
logger.error(f"Fallback initialization failed: {e2}") | |
return False | |
def _get_professional_prompt(self, analysis_type: str = "multimodal") -> str: | |
"""Get professional prompt that teaches BAGEL to use the complete knowledge base""" | |
try: | |
# Import the complete knowledge base | |
from professional_photography import EXPERT_PHOTOGRAPHY_KNOWLEDGE | |
# Create the teaching prompt with the complete structure | |
prompt = f"""Analiza esta imagen con conocimiento cinematográfico profesional completo. Estructura tu respuesta así: | |
GRAMÁTICA: [PLANO] of [SUJETO] [ACCIÓN] [CONTEXTO], [ILUMINACIÓN], [COMPOSICIÓN], shot on [CÁMARA], [LENTE], [CONFIGURACIÓN] | |
BUSCA ESTOS CAMPOS: | |
PLANO: Consulta photographic_planes completo: | |
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('photographic_planes', {})} | |
- Encuentra el plano que coincida exactamente | |
SUJETO + ACCIÓN: Describe exactamente lo que ves - persona, objeto, qué está haciendo | |
CONTEXTO: Describe el ambiente/entorno completamente - ubicación, elementos del fondo | |
ILUMINACIÓN: Consulta lighting_principles completo: | |
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('lighting_principles', {})} | |
- Encuentra el tipo de iluminación que coincida | |
COMPOSICIÓN: Consulta composition_rules completo: | |
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('composition_rules', {})} | |
- Encuentra la técnica de composición que se aplica | |
CONFIGURACIÓN TÉCNICA: Consulta scene_types completo: | |
{EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('scene_types', {})} | |
- Encuentra el setup de cámara apropiado para el tipo de escena | |
USA TODA LA BASE DE DATOS EXPERT_PHOTOGRAPHY_KNOWLEDGE COMPLETA. | |
Completa cada campo con la información específica que encuentres. Responde en el formato de gramática especificado.""" | |
return prompt | |
except Exception as e: | |
logger.warning(f"Professional knowledge base access failed: {e}") | |
return """Analiza esta imagen con conocimiento cinematográfico profesional. Estructura tu respuesta así: | |
GRAMÁTICA: [PLANO] of [SUJETO] [ACCIÓN] [CONTEXTO], [ILUMINACIÓN], [COMPOSICIÓN], shot on [CÁMARA], [LENTE], [CONFIGURACIÓN] | |
BUSCA Y COMPLETA CADA CAMPO: | |
- PLANO: wide shot, medium shot, close-up, etc. | |
- SUJETO + ACCIÓN: exactamente lo que ves | |
- CONTEXTO: ambiente y entorno | |
- ILUMINACIÓN: tipo de luz | |
- COMPOSICIÓN: técnica utilizada | |
- CONFIGURACIÓN TÉCNICA: cámara y lente apropiados | |
Usa tu conocimiento profesional completo para completar cada campo.""" | |
def _save_temp_image(self, image: Image.Image) -> str: | |
"""Save image to temporary file for API call""" | |
try: | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') | |
temp_path = temp_file.name | |
temp_file.close() | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
image.save(temp_path, 'PNG') | |
return temp_path | |
except Exception as e: | |
logger.error(f"Failed to save temporary image: {e}") | |
return None | |
def _cleanup_temp_file(self, file_path: str): | |
"""Clean up temporary file""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.unlink(file_path) | |
except Exception as e: | |
logger.warning(f"Failed to cleanup temp file: {e}") | |
def analyze_image(self, image: Image.Image, prompt: str = None) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image using BAGEL API with professional cinematography enhancement""" | |
if not self.is_initialized: | |
success = self.initialize() | |
if not success: | |
return "BAGEL API not available", {"error": "API initialization failed"} | |
temp_path = None | |
metadata = { | |
"model": "BAGEL-7B-Professional", | |
"device": "api", | |
"confidence": 0.9, | |
"api_endpoint": self.api_endpoint, | |
"space_url": self.space_url, | |
"prompt_used": prompt, | |
"has_camera_suggestion": False, | |
"professional_enhancement": True | |
} | |
try: | |
# Use professional prompt created by professional_photography.py | |
if prompt is None: | |
prompt = self._get_professional_prompt("multimodal") | |
# Save image to temporary file | |
temp_path = self._save_temp_image(image) | |
if not temp_path: | |
return "Image processing failed", {"error": "Could not save image"} | |
logger.info("Calling BAGEL API with professional_photography.py prompt...") | |
# Call BAGEL API with professional prompt - let it do its work | |
result = self.client.predict( | |
image=handle_file(temp_path), | |
prompt=prompt, | |
show_thinking=False, | |
do_sample=True, # Allow creativity | |
text_temperature=0.7, # Higher temperature for richer descriptions | |
max_new_tokens=1024, # More tokens for detailed analysis | |
api_name=self.api_endpoint | |
) | |
# Extract response without filtering | |
if isinstance(result, tuple) and len(result) >= 2: | |
description = result[1] if result[1] else result[0] | |
else: | |
description = str(result) | |
if isinstance(description, str) and description.strip(): | |
description = description.strip() | |
# Extract camera setup if present | |
if "CAMERA_SETUP:" in description or "2. CAMERA_SETUP" in description: | |
metadata["has_camera_suggestion"] = True | |
logger.info("BAGEL provided camera setup recommendation") | |
else: | |
metadata["has_camera_suggestion"] = False | |
# Mark as cinematography enhanced | |
metadata["cinematography_context_applied"] = True | |
else: | |
description = "Professional cinematographic analysis completed" | |
metadata["has_camera_suggestion"] = False | |
# Update metadata | |
metadata.update({ | |
"response_length": len(description), | |
"analysis_type": "professional_enhanced" | |
}) | |
logger.info(f"BAGEL Professional analysis complete: {len(description)} chars") | |
return description, metadata | |
except Exception as e: | |
logger.error(f"BAGEL Professional analysis failed: {e}") | |
return "Professional analysis failed", {"error": str(e), "model": "BAGEL-7B-Professional"} | |
finally: | |
if temp_path: | |
self._cleanup_temp_file(temp_path) | |
def analyze_for_cinematic_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image specifically for cinematic/MIA TV Series prompt generation""" | |
cinematic_prompt = self._get_professional_prompt("cinematic") | |
return self.analyze_image(image, cinematic_prompt) | |
def analyze_for_flux_with_professional_context(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image for FLUX with enhanced professional cinematography context""" | |
flux_prompt = self._get_professional_prompt("flux_optimized") | |
return self.analyze_image(image, flux_prompt) | |
def analyze_for_multiengine_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image for multi-engine compatibility (Flux, Midjourney, etc.)""" | |
multiengine_prompt = self._get_professional_prompt("multimodal") | |
return self.analyze_image(image, multiengine_prompt) | |
def cleanup(self) -> None: | |
"""Clean up API client resources""" | |
try: | |
if hasattr(self, 'client'): | |
self.client = None | |
super().cleanup() | |
logger.info("BAGEL Professional API resources cleaned up") | |
except Exception as e: | |
logger.warning(f"BAGEL Professional API cleanup warning: {e}") | |
class FallbackAnalyzer(BaseImageAnalyzer): | |
"""Enhanced fallback analyzer using professional_photography.py knowledge""" | |
def __init__(self): | |
super().__init__() | |
self.professional_analyzer = professional_analyzer | |
def initialize(self) -> bool: | |
"""Fallback with cinematography enhancement is always ready""" | |
self.is_initialized = True | |
return True | |
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Provide enhanced image description using professional_photography.py""" | |
try: | |
width, height = image.size | |
aspect_ratio = width / height | |
# Use professional_photography.py to analyze basic image properties | |
basic_description = f"Professional photograph with {width}x{height} resolution, aspect ratio {aspect_ratio:.2f}" | |
# Let professional_photography.py enhance this | |
try: | |
enhancement_result = export_professional_prompt_enhancement( | |
bagel_output=basic_description, | |
bagel_metadata={"image_size": f"{width}x{height}", "aspect_ratio": aspect_ratio} | |
) | |
description = enhancement_result["enhanced_prompt"] | |
camera_setup = enhancement_result["metadata"].get("technical_context", "") | |
except Exception as e: | |
logger.warning(f"Professional enhancement failed in fallback: {e}") | |
# Basic fallback | |
if aspect_ratio > 1.5: | |
description = "Wide shot composition with natural lighting and balanced framing" | |
camera_setup = "shot on Phase One XT, 24-70mm f/4 lens, ISO 100" | |
elif aspect_ratio < 0.75: | |
description = "Portrait composition with professional lighting and sharp focus" | |
camera_setup = "shot on Canon EOS R5, 85mm f/1.4 lens, ISO 200" | |
else: | |
description = "Balanced composition with professional execution" | |
camera_setup = "shot on Canon EOS R6, 50mm f/1.8 lens, ISO 400" | |
metadata = { | |
"model": "Professional-Fallback", | |
"device": "cpu", | |
"confidence": 0.7, | |
"image_size": f"{width}x{height}", | |
"aspect_ratio": round(aspect_ratio, 2), | |
"has_camera_suggestion": True, | |
"camera_setup": camera_setup, | |
"professional_enhancement": True, | |
"cinematography_context_applied": True | |
} | |
return description, metadata | |
except Exception as e: | |
logger.error(f"Professional fallback analysis failed: {e}") | |
return "Professional cinematographic analysis", { | |
"error": str(e), | |
"model": "Professional-Fallback" | |
} | |
class ModelManager: | |
"""Enhanced manager for handling image analysis models with professional cinematography integration""" | |
def __init__(self, preferred_model: str = "bagel-professional"): | |
self.preferred_model = preferred_model | |
self.analyzers = {} | |
self.current_analyzer = None | |
def get_analyzer(self, model_name: str = None) -> Optional[BaseImageAnalyzer]: | |
"""Get or create analyzer for specified model""" | |
model_name = model_name or self.preferred_model | |
if model_name not in self.analyzers: | |
if model_name in ["bagel-api", "bagel-professional"]: | |
self.analyzers[model_name] = BagelAPIAnalyzer() | |
elif model_name == "fallback": | |
self.analyzers[model_name] = FallbackAnalyzer() | |
else: | |
logger.warning(f"Unknown model: {model_name}, using professional fallback") | |
model_name = "fallback" | |
self.analyzers[model_name] = FallbackAnalyzer() | |
return self.analyzers[model_name] | |
def analyze_image(self, image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image with professional cinematography enhancement""" | |
analyzer = self.get_analyzer(model_name) | |
if analyzer is None: | |
return "No analyzer available", {"error": "Model not found"} | |
# Choose analysis method based on type and analyzer capabilities | |
if analysis_type == "cinematic" and hasattr(analyzer, 'analyze_for_cinematic_prompt'): | |
success, result = safe_execute(analyzer.analyze_for_cinematic_prompt, image) | |
elif analysis_type == "flux" and hasattr(analyzer, 'analyze_for_flux_with_professional_context'): | |
success, result = safe_execute(analyzer.analyze_for_flux_with_professional_context, image) | |
elif analysis_type == "multiengine" and hasattr(analyzer, 'analyze_for_multiengine_prompt'): | |
success, result = safe_execute(analyzer.analyze_for_multiengine_prompt, image) | |
else: | |
success, result = safe_execute(analyzer.analyze_image, image) | |
if success and result[1].get("error") is None: | |
return result | |
else: | |
# Fallback with professional_photography.py | |
logger.warning(f"Primary model failed, using professional fallback") | |
fallback_analyzer = self.get_analyzer("fallback") | |
fallback_success, fallback_result = safe_execute(fallback_analyzer.analyze_image, image) | |
if fallback_success: | |
return fallback_result | |
else: | |
return "All analyzers failed", {"error": "Complete analysis failure"} | |
def cleanup_all(self) -> None: | |
"""Clean up all model resources""" | |
for analyzer in self.analyzers.values(): | |
analyzer.cleanup() | |
self.analyzers.clear() | |
clean_memory() | |
logger.info("All analyzers cleaned up") | |
# Global model manager instance | |
model_manager = ModelManager(preferred_model="bagel-professional") | |
def analyze_image(image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]: | |
""" | |
Enhanced convenience function for professional cinematography analysis | |
Args: | |
image: PIL Image to analyze | |
model_name: Optional model name ("bagel-professional", "fallback") | |
analysis_type: Type of analysis ("multiengine", "cinematic", "flux") | |
Returns: | |
Tuple of (description, metadata) with professional cinematography enhancement | |
""" | |
return model_manager.analyze_image(image, model_name, analysis_type) | |
# Export main components | |
__all__ = [ | |
"BaseImageAnalyzer", | |
"BagelAPIAnalyzer", | |
"FallbackAnalyzer", | |
"ModelManager", | |
"model_manager", | |
"analyze_image" | |
] |