Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Model management for Phramer AI | |
By Pariente AI, for MIA TV Series | |
BAGEL 7B integration with professional photography knowledge enhancement | |
""" | |
import spaces | |
import logging | |
import tempfile | |
import os | |
import re | |
from typing import Optional, Dict, Any, Tuple | |
from PIL import Image | |
from gradio_client import Client, handle_file | |
from config import get_device_config, PROFESSIONAL_PHOTOGRAPHY_CONFIG | |
from utils import clean_memory, safe_execute | |
from professional_photography import ( | |
ProfessionalPhotoAnalyzer, | |
enhance_flux_prompt_with_professional_knowledge, | |
professional_analyzer, | |
export_professional_prompt_enhancement | |
) | |
logger = logging.getLogger(__name__) | |
class BaseImageAnalyzer: | |
"""Base class for image analysis models""" | |
def __init__(self): | |
self.is_initialized = False | |
self.device_config = get_device_config() | |
def initialize(self) -> bool: | |
"""Initialize the model""" | |
raise NotImplementedError | |
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image and return description""" | |
raise NotImplementedError | |
def cleanup(self) -> None: | |
"""Clean up model resources""" | |
clean_memory() | |
class BagelAPIAnalyzer(BaseImageAnalyzer): | |
"""BAGEL 7B model with professional photography knowledge integration""" | |
def __init__(self): | |
super().__init__() | |
self.client = None | |
self.space_url = "Malaji71/Bagel-7B-Demo" | |
self.api_endpoint = "/image_understanding" | |
self.hf_token = os.getenv("HF_TOKEN") | |
self.professional_analyzer = professional_analyzer | |
def initialize(self) -> bool: | |
"""Initialize BAGEL API client with authentication""" | |
if self.is_initialized: | |
return True | |
try: | |
logger.info("Initializing BAGEL API client for Phramer AI...") | |
# Initialize client with token if available | |
if self.hf_token: | |
logger.info("Using HF token for enhanced API access") | |
self.client = Client(self.space_url, hf_token=self.hf_token) | |
else: | |
logger.info("Using public API access") | |
self.client = Client(self.space_url) | |
self.is_initialized = True | |
logger.info("BAGEL API client initialized successfully") | |
return True | |
except Exception as e: | |
logger.error(f"BAGEL API client initialization failed: {e}") | |
if self.hf_token: | |
logger.info("Retrying without token...") | |
try: | |
self.client = Client(self.space_url) | |
self.is_initialized = True | |
logger.info("BAGEL API client initialized (fallback mode)") | |
return True | |
except Exception as e2: | |
logger.error(f"Fallback initialization failed: {e2}") | |
return False | |
def _get_professional_prompt(self, analysis_type: str = "multimodal") -> str: | |
"""Get professional prompt created by professional_photography.py""" | |
try: | |
# Let professional_photography.py create the complete prompt with full knowledge base | |
enhanced_context = self.professional_analyzer.generate_enhanced_context("") | |
if analysis_type == "cinematic": | |
# Use the professional analyzer to format cinematic prompt | |
prompt = self.professional_analyzer.format_bagel_enhancement_prompt( | |
base_prompt="Analyze this image with complete professional cinematography expertise for cinema-quality generation.", | |
enhanced_context=enhanced_context | |
) | |
elif analysis_type == "flux_optimized": | |
# Use the professional analyzer to format FLUX prompt | |
prompt = self.professional_analyzer.format_bagel_enhancement_prompt( | |
base_prompt="Analyze this image with complete professional photography knowledge for photorealistic prompt generation.", | |
enhanced_context=enhanced_context | |
) | |
else: | |
# Use the professional analyzer to format multimodal prompt | |
prompt = self.professional_analyzer.format_bagel_enhancement_prompt( | |
base_prompt="Analyze this image with complete professional cinematography knowledge for multi-engine prompt generation.", | |
enhanced_context=enhanced_context | |
) | |
return prompt | |
except Exception as e: | |
logger.warning(f"Professional prompt generation failed: {e}") | |
# Fallback to basic professional prompt | |
return """Analyze this image using complete professional cinematography expertise. Provide exactly two sections: | |
1. DESCRIPTION: Complete professional visual analysis using your full cinematography knowledge - identify the photographic plane (wide shot, close-up, medium shot), camera angle (eye level, low angle, high angle), lighting type (golden hour, natural daylight, dramatic), and composition technique (rule of thirds, leading lines, symmetrical). Describe the specific subject, action, and context in detail. | |
2. CAMERA_SETUP: Recommend specific professional equipment based on your scene analysis - camera body, lens specifications, aperture settings, ISO, and any special techniques. | |
Apply your complete professional cinematography knowledge.""" | |
def _save_temp_image(self, image: Image.Image) -> str: | |
"""Save image to temporary file for API call""" | |
try: | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png') | |
temp_path = temp_file.name | |
temp_file.close() | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
image.save(temp_path, 'PNG') | |
return temp_path | |
except Exception as e: | |
logger.error(f"Failed to save temporary image: {e}") | |
return None | |
def _cleanup_temp_file(self, file_path: str): | |
"""Clean up temporary file""" | |
try: | |
if file_path and os.path.exists(file_path): | |
os.unlink(file_path) | |
except Exception as e: | |
logger.warning(f"Failed to cleanup temp file: {e}") | |
def analyze_image(self, image: Image.Image, prompt: str = None) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image using BAGEL API with professional cinematography enhancement""" | |
if not self.is_initialized: | |
success = self.initialize() | |
if not success: | |
return "BAGEL API not available", {"error": "API initialization failed"} | |
temp_path = None | |
metadata = { | |
"model": "BAGEL-7B-Professional", | |
"device": "api", | |
"confidence": 0.9, | |
"api_endpoint": self.api_endpoint, | |
"space_url": self.space_url, | |
"prompt_used": prompt, | |
"has_camera_suggestion": False, | |
"professional_enhancement": True | |
} | |
try: | |
# Use professional prompt created by professional_photography.py | |
if prompt is None: | |
prompt = self._get_professional_prompt("multimodal") | |
# Save image to temporary file | |
temp_path = self._save_temp_image(image) | |
if not temp_path: | |
return "Image processing failed", {"error": "Could not save image"} | |
logger.info("Calling BAGEL API with professional_photography.py prompt...") | |
# Call BAGEL API with professional prompt - let it do its work | |
result = self.client.predict( | |
image=handle_file(temp_path), | |
prompt=prompt, | |
show_thinking=False, | |
do_sample=True, # Allow creativity | |
text_temperature=0.7, # Higher temperature for richer descriptions | |
max_new_tokens=1024, # More tokens for detailed analysis | |
api_name=self.api_endpoint | |
) | |
# Extract response without filtering | |
if isinstance(result, tuple) and len(result) >= 2: | |
description = result[1] if result[1] else result[0] | |
else: | |
description = str(result) | |
if isinstance(description, str) and description.strip(): | |
description = description.strip() | |
# Extract camera setup if present | |
if "CAMERA_SETUP:" in description or "2. CAMERA_SETUP" in description: | |
metadata["has_camera_suggestion"] = True | |
logger.info("BAGEL provided camera setup recommendation") | |
else: | |
metadata["has_camera_suggestion"] = False | |
# Mark as cinematography enhanced | |
metadata["cinematography_context_applied"] = True | |
else: | |
description = "Professional cinematographic analysis completed" | |
metadata["has_camera_suggestion"] = False | |
# Update metadata | |
metadata.update({ | |
"response_length": len(description), | |
"analysis_type": "professional_enhanced" | |
}) | |
logger.info(f"BAGEL Professional analysis complete: {len(description)} chars") | |
return description, metadata | |
except Exception as e: | |
logger.error(f"BAGEL Professional analysis failed: {e}") | |
return "Professional analysis failed", {"error": str(e), "model": "BAGEL-7B-Professional"} | |
finally: | |
if temp_path: | |
self._cleanup_temp_file(temp_path) | |
def analyze_for_cinematic_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image specifically for cinematic/MIA TV Series prompt generation""" | |
cinematic_prompt = self._get_professional_prompt("cinematic") | |
return self.analyze_image(image, cinematic_prompt) | |
def analyze_for_flux_with_professional_context(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image for FLUX with enhanced professional cinematography context""" | |
flux_prompt = self._get_professional_prompt("flux_optimized") | |
return self.analyze_image(image, flux_prompt) | |
def analyze_for_multiengine_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image for multi-engine compatibility (Flux, Midjourney, etc.)""" | |
multiengine_prompt = self._get_professional_prompt("multimodal") | |
return self.analyze_image(image, multiengine_prompt) | |
def cleanup(self) -> None: | |
"""Clean up API client resources""" | |
try: | |
if hasattr(self, 'client'): | |
self.client = None | |
super().cleanup() | |
logger.info("BAGEL Professional API resources cleaned up") | |
except Exception as e: | |
logger.warning(f"BAGEL Professional API cleanup warning: {e}") | |
class FallbackAnalyzer(BaseImageAnalyzer): | |
"""Enhanced fallback analyzer using professional_photography.py knowledge""" | |
def __init__(self): | |
super().__init__() | |
self.professional_analyzer = professional_analyzer | |
def initialize(self) -> bool: | |
"""Fallback with cinematography enhancement is always ready""" | |
self.is_initialized = True | |
return True | |
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]: | |
"""Provide enhanced image description using professional_photography.py""" | |
try: | |
width, height = image.size | |
aspect_ratio = width / height | |
# Use professional_photography.py to analyze basic image properties | |
basic_description = f"Professional photograph with {width}x{height} resolution, aspect ratio {aspect_ratio:.2f}" | |
# Let professional_photography.py enhance this | |
try: | |
enhancement_result = export_professional_prompt_enhancement( | |
bagel_output=basic_description, | |
bagel_metadata={"image_size": f"{width}x{height}", "aspect_ratio": aspect_ratio} | |
) | |
description = enhancement_result["enhanced_prompt"] | |
camera_setup = enhancement_result["metadata"].get("technical_context", "") | |
except Exception as e: | |
logger.warning(f"Professional enhancement failed in fallback: {e}") | |
# Basic fallback | |
if aspect_ratio > 1.5: | |
description = "Wide shot composition with natural lighting and balanced framing" | |
camera_setup = "shot on Phase One XT, 24-70mm f/4 lens, ISO 100" | |
elif aspect_ratio < 0.75: | |
description = "Portrait composition with professional lighting and sharp focus" | |
camera_setup = "shot on Canon EOS R5, 85mm f/1.4 lens, ISO 200" | |
else: | |
description = "Balanced composition with professional execution" | |
camera_setup = "shot on Canon EOS R6, 50mm f/1.8 lens, ISO 400" | |
metadata = { | |
"model": "Professional-Fallback", | |
"device": "cpu", | |
"confidence": 0.7, | |
"image_size": f"{width}x{height}", | |
"aspect_ratio": round(aspect_ratio, 2), | |
"has_camera_suggestion": True, | |
"camera_setup": camera_setup, | |
"professional_enhancement": True, | |
"cinematography_context_applied": True | |
} | |
return description, metadata | |
except Exception as e: | |
logger.error(f"Professional fallback analysis failed: {e}") | |
return "Professional cinematographic analysis", { | |
"error": str(e), | |
"model": "Professional-Fallback" | |
} | |
class ModelManager: | |
"""Enhanced manager for handling image analysis models with professional cinematography integration""" | |
def __init__(self, preferred_model: str = "bagel-professional"): | |
self.preferred_model = preferred_model | |
self.analyzers = {} | |
self.current_analyzer = None | |
def get_analyzer(self, model_name: str = None) -> Optional[BaseImageAnalyzer]: | |
"""Get or create analyzer for specified model""" | |
model_name = model_name or self.preferred_model | |
if model_name not in self.analyzers: | |
if model_name in ["bagel-api", "bagel-professional"]: | |
self.analyzers[model_name] = BagelAPIAnalyzer() | |
elif model_name == "fallback": | |
self.analyzers[model_name] = FallbackAnalyzer() | |
else: | |
logger.warning(f"Unknown model: {model_name}, using professional fallback") | |
model_name = "fallback" | |
self.analyzers[model_name] = FallbackAnalyzer() | |
return self.analyzers[model_name] | |
def analyze_image(self, image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]: | |
"""Analyze image with professional cinematography enhancement""" | |
analyzer = self.get_analyzer(model_name) | |
if analyzer is None: | |
return "No analyzer available", {"error": "Model not found"} | |
# Choose analysis method based on type and analyzer capabilities | |
if analysis_type == "cinematic" and hasattr(analyzer, 'analyze_for_cinematic_prompt'): | |
success, result = safe_execute(analyzer.analyze_for_cinematic_prompt, image) | |
elif analysis_type == "flux" and hasattr(analyzer, 'analyze_for_flux_with_professional_context'): | |
success, result = safe_execute(analyzer.analyze_for_flux_with_professional_context, image) | |
elif analysis_type == "multiengine" and hasattr(analyzer, 'analyze_for_multiengine_prompt'): | |
success, result = safe_execute(analyzer.analyze_for_multiengine_prompt, image) | |
else: | |
success, result = safe_execute(analyzer.analyze_image, image) | |
if success and result[1].get("error") is None: | |
return result | |
else: | |
# Fallback with professional_photography.py | |
logger.warning(f"Primary model failed, using professional fallback") | |
fallback_analyzer = self.get_analyzer("fallback") | |
fallback_success, fallback_result = safe_execute(fallback_analyzer.analyze_image, image) | |
if fallback_success: | |
return fallback_result | |
else: | |
return "All analyzers failed", {"error": "Complete analysis failure"} | |
def cleanup_all(self) -> None: | |
"""Clean up all model resources""" | |
for analyzer in self.analyzers.values(): | |
analyzer.cleanup() | |
self.analyzers.clear() | |
clean_memory() | |
logger.info("All analyzers cleaned up") | |
# Global model manager instance | |
model_manager = ModelManager(preferred_model="bagel-professional") | |
def analyze_image(image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]: | |
""" | |
Enhanced convenience function for professional cinematography analysis | |
Args: | |
image: PIL Image to analyze | |
model_name: Optional model name ("bagel-professional", "fallback") | |
analysis_type: Type of analysis ("multiengine", "cinematic", "flux") | |
Returns: | |
Tuple of (description, metadata) with professional cinematography enhancement | |
""" | |
return model_manager.analyze_image(image, model_name, analysis_type) | |
# Export main components | |
__all__ = [ | |
"BaseImageAnalyzer", | |
"BagelAPIAnalyzer", | |
"FallbackAnalyzer", | |
"ModelManager", | |
"model_manager", | |
"analyze_image" | |
] |