Phramer_AI / models.py
Malaji71's picture
Update models.py
96bcb6f verified
"""
Model management for Phramer AI
By Pariente AI, for MIA TV Series
BAGEL 7B integration with professional photography knowledge enhancement
"""
import spaces
import logging
import tempfile
import os
import re
from typing import Optional, Dict, Any, Tuple
from PIL import Image
from gradio_client import Client, handle_file
from config import get_device_config, PROFESSIONAL_PHOTOGRAPHY_CONFIG
from utils import clean_memory, safe_execute
from professional_photography import (
ProfessionalPhotoAnalyzer,
enhance_flux_prompt_with_professional_knowledge,
professional_analyzer,
export_professional_prompt_enhancement
)
logger = logging.getLogger(__name__)
class BaseImageAnalyzer:
"""Base class for image analysis models"""
def __init__(self):
self.is_initialized = False
self.device_config = get_device_config()
def initialize(self) -> bool:
"""Initialize the model"""
raise NotImplementedError
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image and return description"""
raise NotImplementedError
def cleanup(self) -> None:
"""Clean up model resources"""
clean_memory()
class BagelAPIAnalyzer(BaseImageAnalyzer):
"""BAGEL 7B model with professional photography knowledge integration"""
def __init__(self):
super().__init__()
self.client = None
self.space_url = "Malaji71/Bagel-7B-Demo"
self.api_endpoint = "/image_understanding"
self.hf_token = os.getenv("HF_TOKEN")
self.professional_analyzer = professional_analyzer
def initialize(self) -> bool:
"""Initialize BAGEL API client with authentication"""
if self.is_initialized:
return True
try:
logger.info("Initializing BAGEL API client for Phramer AI...")
# Initialize client with token if available
if self.hf_token:
logger.info("Using HF token for enhanced API access")
self.client = Client(self.space_url, hf_token=self.hf_token)
else:
logger.info("Using public API access")
self.client = Client(self.space_url)
self.is_initialized = True
logger.info("BAGEL API client initialized successfully")
return True
except Exception as e:
logger.error(f"BAGEL API client initialization failed: {e}")
if self.hf_token:
logger.info("Retrying without token...")
try:
self.client = Client(self.space_url)
self.is_initialized = True
logger.info("BAGEL API client initialized (fallback mode)")
return True
except Exception as e2:
logger.error(f"Fallback initialization failed: {e2}")
return False
def _get_professional_prompt(self, analysis_type: str = "multimodal") -> str:
"""Get professional prompt that teaches BAGEL to use the complete knowledge base"""
try:
# Import the complete knowledge base
from professional_photography import EXPERT_PHOTOGRAPHY_KNOWLEDGE
# Create the teaching prompt with the complete structure
prompt = f"""Analyze this image using complete professional cinematography knowledge.
STRUCTURE: [PLANE] of [SUBJECT] [ACTION] [CONTEXT], [LIGHTING], [COMPOSITION], shot on [CAMERA], [LENS], [SETTINGS]
PLANE: From {EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('photographic_planes', {})}
SUBJECT + ACTION: Define accurately what you see
CONTEXT: Define the environment accurately
LIGHTING: From {EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('lighting_principles', {})}
COMPOSITION: From {EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('composition_rules', {})}
CAMERA ANGLES: From {EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('camera_angles', {})}
TECHNICAL SETUP: From {EXPERT_PHOTOGRAPHY_KNOWLEDGE.get('scene_types', {})}
Complete the structure using the appropriate elements from each section."""
return prompt
except Exception as e:
logger.warning(f"Professional knowledge base access failed: {e}")
return """Analyze this image using complete professional cinematography knowledge.
STRUCTURE: [PLANE] of [SUBJECT] [ACTION] [CONTEXT], [LIGHTING], [COMPOSITION], shot on [CAMERA], [LENS], [SETTINGS]
PLANE: wide_shot, medium_shot, close_up, extreme_wide_shot, extreme_close_up, detail_shot
SUBJECT + ACTION: Define accurately what you see
CONTEXT: Define the environment accurately
LIGHTING: golden_hour, natural_daylight, dramatic_lighting, soft_natural, blue_hour, studio_lighting
COMPOSITION: rule_of_thirds, leading_lines, symmetrical, centered_composition, dynamic_composition
TECHNICAL SETUP: Professional camera and lens specifications
Complete the structure using the appropriate elements."""
def _save_temp_image(self, image: Image.Image) -> str:
"""Save image to temporary file for API call"""
try:
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.png')
temp_path = temp_file.name
temp_file.close()
if image.mode != 'RGB':
image = image.convert('RGB')
image.save(temp_path, 'PNG')
return temp_path
except Exception as e:
logger.error(f"Failed to save temporary image: {e}")
return None
def _cleanup_temp_file(self, file_path: str):
"""Clean up temporary file"""
try:
if file_path and os.path.exists(file_path):
os.unlink(file_path)
except Exception as e:
logger.warning(f"Failed to cleanup temp file: {e}")
@spaces.GPU(duration=60)
def analyze_image(self, image: Image.Image, prompt: str = None) -> Tuple[str, Dict[str, Any]]:
"""Analyze image using BAGEL API with professional cinematography enhancement"""
if not self.is_initialized:
success = self.initialize()
if not success:
return "BAGEL API not available", {"error": "API initialization failed"}
temp_path = None
metadata = {
"model": "BAGEL-7B-Professional",
"device": "api",
"confidence": 0.9,
"api_endpoint": self.api_endpoint,
"space_url": self.space_url,
"prompt_used": prompt,
"has_camera_suggestion": False,
"professional_enhancement": True
}
try:
# Use professional prompt created by professional_photography.py
if prompt is None:
prompt = self._get_professional_prompt("multimodal")
# Save image to temporary file
temp_path = self._save_temp_image(image)
if not temp_path:
return "Image processing failed", {"error": "Could not save image"}
logger.info("Calling BAGEL API with professional_photography.py prompt...")
# Call BAGEL API with professional prompt - FORCE NEW READING
result = self.client.predict(
image=handle_file(temp_path),
prompt=prompt,
show_thinking=False,
do_sample=True, # Allow creativity and variation
text_temperature=0.8, # Higher temperature for different responses each time
max_new_tokens=1024, # More tokens for detailed analysis
api_name=self.api_endpoint
)
# Extract response without filtering
if isinstance(result, tuple) and len(result) >= 2:
description = result[1] if result[1] else result[0]
else:
description = str(result)
if isinstance(description, str) and description.strip():
description = description.strip()
# Extract camera setup if present
if "CAMERA_SETUP:" in description or "2. CAMERA_SETUP" in description:
metadata["has_camera_suggestion"] = True
logger.info("BAGEL provided camera setup recommendation")
else:
metadata["has_camera_suggestion"] = False
# Mark as cinematography enhanced
metadata["cinematography_context_applied"] = True
else:
description = "Professional cinematographic analysis completed"
metadata["has_camera_suggestion"] = False
# Update metadata
metadata.update({
"response_length": len(description),
"analysis_type": "professional_enhanced"
})
logger.info(f"BAGEL Professional analysis complete: {len(description)} chars")
return description, metadata
except Exception as e:
logger.error(f"BAGEL Professional analysis failed: {e}")
return "Professional analysis failed", {"error": str(e), "model": "BAGEL-7B-Professional"}
finally:
if temp_path:
self._cleanup_temp_file(temp_path)
def analyze_for_cinematic_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image specifically for cinematic/MIA TV Series prompt generation"""
cinematic_prompt = self._get_professional_prompt("cinematic")
return self.analyze_image(image, cinematic_prompt)
def analyze_for_flux_with_professional_context(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image for FLUX with enhanced professional cinematography context"""
flux_prompt = self._get_professional_prompt("flux_optimized")
return self.analyze_image(image, flux_prompt)
def analyze_for_multiengine_prompt(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Analyze image for multi-engine compatibility (Flux, Midjourney, etc.)"""
multiengine_prompt = self._get_professional_prompt("multimodal")
return self.analyze_image(image, multiengine_prompt)
def cleanup(self) -> None:
"""Clean up API client resources"""
try:
if hasattr(self, 'client'):
self.client = None
super().cleanup()
logger.info("BAGEL Professional API resources cleaned up")
except Exception as e:
logger.warning(f"BAGEL Professional API cleanup warning: {e}")
class FallbackAnalyzer(BaseImageAnalyzer):
"""Enhanced fallback analyzer using professional_photography.py knowledge"""
def __init__(self):
super().__init__()
self.professional_analyzer = professional_analyzer
def initialize(self) -> bool:
"""Fallback with cinematography enhancement is always ready"""
self.is_initialized = True
return True
def analyze_image(self, image: Image.Image) -> Tuple[str, Dict[str, Any]]:
"""Provide enhanced image description using professional_photography.py"""
try:
width, height = image.size
aspect_ratio = width / height
# Use REAL functions from professional_photography.py
try:
# Use the REAL function that exists
from professional_photography import get_professional_camera_setup
# Create basic scene description
if aspect_ratio > 1.5:
scene_keywords = ["landscape", "outdoor", "wide"]
basic_description = "Wide shot composition with natural lighting and balanced framing"
elif aspect_ratio < 0.75:
scene_keywords = ["portrait", "person", "face"]
basic_description = "Portrait composition with professional lighting and sharp focus"
else:
scene_keywords = ["general", "balanced"]
basic_description = "Balanced composition with professional execution"
# Get professional camera setup using REAL function
camera_config = get_professional_camera_setup(" ".join(scene_keywords))
camera_setup = f"shot on {camera_config.get('camera', 'Canon EOS R6')}, {camera_config.get('lens', '50mm f/1.8')}, ISO {camera_config.get('iso', '400')}"
# Use REAL enhancement function
from professional_photography import enhance_flux_prompt_with_professional_knowledge
enhanced_description = enhance_flux_prompt_with_professional_knowledge(basic_description)
description = enhanced_description
except Exception as e:
logger.warning(f"Professional enhancement failed in fallback: {e}")
# Simple fallback without professional functions
if aspect_ratio > 1.5:
description = "Wide shot composition with natural lighting and balanced framing"
camera_setup = "shot on Phase One XT, 24-70mm f/4 lens, ISO 100"
elif aspect_ratio < 0.75:
description = "Portrait composition with professional lighting and sharp focus"
camera_setup = "shot on Canon EOS R5, 85mm f/1.4 lens, ISO 200"
else:
description = "Balanced composition with professional execution"
camera_setup = "shot on Canon EOS R6, 50mm f/1.8 lens, ISO 400"
metadata = {
"model": "Professional-Fallback",
"device": "cpu",
"confidence": 0.7,
"image_size": f"{width}x{height}",
"aspect_ratio": round(aspect_ratio, 2),
"has_camera_suggestion": True,
"camera_setup": camera_setup,
"professional_enhancement": True,
"cinematography_context_applied": True
}
return description, metadata
except Exception as e:
logger.error(f"Professional fallback analysis failed: {e}")
return "Professional cinematographic analysis", {
"error": str(e),
"model": "Professional-Fallback"
}
class ModelManager:
"""Enhanced manager for handling image analysis models with professional cinematography integration"""
def __init__(self, preferred_model: str = "bagel-professional"):
self.preferred_model = preferred_model
self.analyzers = {}
self.current_analyzer = None
def get_analyzer(self, model_name: str = None) -> Optional[BaseImageAnalyzer]:
"""Get or create analyzer for specified model"""
model_name = model_name or self.preferred_model
if model_name not in self.analyzers:
if model_name in ["bagel-api", "bagel-professional"]:
self.analyzers[model_name] = BagelAPIAnalyzer()
elif model_name == "fallback":
self.analyzers[model_name] = FallbackAnalyzer()
else:
logger.warning(f"Unknown model: {model_name}, using professional fallback")
model_name = "fallback"
self.analyzers[model_name] = FallbackAnalyzer()
return self.analyzers[model_name]
def analyze_image(self, image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]:
"""Analyze image with professional cinematography enhancement"""
analyzer = self.get_analyzer(model_name)
if analyzer is None:
return "No analyzer available", {"error": "Model not found"}
# Choose analysis method based on type and analyzer capabilities
if analysis_type == "cinematic" and hasattr(analyzer, 'analyze_for_cinematic_prompt'):
success, result = safe_execute(analyzer.analyze_for_cinematic_prompt, image)
elif analysis_type == "flux" and hasattr(analyzer, 'analyze_for_flux_with_professional_context'):
success, result = safe_execute(analyzer.analyze_for_flux_with_professional_context, image)
elif analysis_type == "multiengine" and hasattr(analyzer, 'analyze_for_multiengine_prompt'):
success, result = safe_execute(analyzer.analyze_for_multiengine_prompt, image)
else:
success, result = safe_execute(analyzer.analyze_image, image)
if success and result[1].get("error") is None:
return result
else:
# Fallback with professional_photography.py
logger.warning(f"Primary model failed, using professional fallback")
fallback_analyzer = self.get_analyzer("fallback")
fallback_success, fallback_result = safe_execute(fallback_analyzer.analyze_image, image)
if fallback_success:
return fallback_result
else:
return "All analyzers failed", {"error": "Complete analysis failure"}
def cleanup_all(self) -> None:
"""Clean up all model resources"""
for analyzer in self.analyzers.values():
analyzer.cleanup()
self.analyzers.clear()
clean_memory()
logger.info("All analyzers cleaned up")
# Global model manager instance
model_manager = ModelManager(preferred_model="bagel-professional")
def analyze_image(image: Image.Image, model_name: str = None, analysis_type: str = "multiengine") -> Tuple[str, Dict[str, Any]]:
"""
Enhanced convenience function for professional cinematography analysis
Args:
image: PIL Image to analyze
model_name: Optional model name ("bagel-professional", "fallback")
analysis_type: Type of analysis ("multiengine", "cinematic", "flux")
Returns:
Tuple of (description, metadata) with professional cinematography enhancement
"""
return model_manager.analyze_image(image, model_name, analysis_type)
# Export main components
__all__ = [
"BaseImageAnalyzer",
"BagelAPIAnalyzer",
"FallbackAnalyzer",
"ModelManager",
"model_manager",
"analyze_image"
]