Phramer_AI / utils.py
Malaji71's picture
Update utils.py
f746366 verified
raw
history blame
22.5 kB
"""
Utility functions for Phramer AI
By Pariente AI, for MIA TV Series
Enhanced with professional cinematography knowledge and multi-engine optimization
"""
import re
import logging
import gc
from typing import Optional, Tuple, Dict, Any, List
from PIL import Image
import torch
import numpy as np
from config import PROCESSING_CONFIG, FLUX_RULES, PROFESSIONAL_PHOTOGRAPHY_CONFIG
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def setup_logging(level: str = "INFO") -> None:
"""Setup logging configuration"""
logging.basicConfig(
level=getattr(logging, level.upper()),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
def optimize_image(image: Any) -> Optional[Image.Image]:
"""
Optimize image for processing
Args:
image: Input image (PIL, numpy array, or file path)
Returns:
Optimized PIL Image or None if failed
"""
if image is None:
return None
try:
# Convert to PIL Image if necessary
if isinstance(image, np.ndarray):
image = Image.fromarray(image)
elif isinstance(image, str):
image = Image.open(image)
elif not isinstance(image, Image.Image):
logger.error(f"Unsupported image type: {type(image)}")
return None
# Convert to RGB if necessary
if image.mode != 'RGB':
image = image.convert('RGB')
# Resize if too large
max_size = PROCESSING_CONFIG["max_image_size"]
if image.size[0] > max_size or image.size[1] > max_size:
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
logger.info(f"Image resized to {image.size}")
return image
except Exception as e:
logger.error(f"Image optimization failed: {e}")
return None
def validate_image(image: Any) -> bool:
"""
Validate if image is processable
Args:
image: Input image to validate
Returns:
True if valid, False otherwise
"""
if image is None:
return False
try:
optimized = optimize_image(image)
return optimized is not None
except Exception:
return False
def clean_memory() -> None:
"""Clean up memory and GPU cache"""
try:
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.synchronize()
logger.debug("Memory cleaned")
except Exception as e:
logger.warning(f"Memory cleanup failed: {e}")
def detect_scene_type_from_analysis(analysis_metadata: Dict[str, Any]) -> str:
"""Detect scene type from BAGEL analysis metadata"""
try:
# Check if BAGEL provided scene detection
if "scene_type" in analysis_metadata:
return analysis_metadata["scene_type"]
# Check camera setup for scene hints
camera_setup = analysis_metadata.get("camera_setup", "").lower()
if any(term in camera_setup for term in ["portrait", "85mm", "135mm"]):
return "portrait"
elif any(term in camera_setup for term in ["landscape", "wide", "24mm", "phase one"]):
return "landscape"
elif any(term in camera_setup for term in ["street", "35mm", "documentary", "leica"]):
return "street"
elif any(term in camera_setup for term in ["cinema", "arri", "red", "anamorphic"]):
return "cinematic"
elif any(term in camera_setup for term in ["architecture", "building", "tilt"]):
return "architectural"
return "default"
except Exception as e:
logger.warning(f"Scene type detection failed: {e}")
return "default"
def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Apply enhanced prompt optimization rules for multi-engine compatibility
Args:
prompt: Raw prompt text from BAGEL analysis
analysis_metadata: Enhanced metadata with cinematography suggestions
Returns:
Optimized prompt for Flux, Midjourney, and other generative engines
"""
if not prompt or not isinstance(prompt, str):
return ""
# Clean the prompt from unwanted elements
cleaned_prompt = prompt
for pattern in FLUX_RULES["remove_patterns"]:
cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE)
# Extract description part only (remove CAMERA_SETUP section if present)
description_part = _extract_description_only(cleaned_prompt)
# Check if BAGEL provided intelligent camera setup with cinematography context
camera_config = ""
scene_type = "default"
if analysis_metadata and analysis_metadata.get("has_camera_suggestion") and analysis_metadata.get("camera_setup"):
# Use BAGEL's intelligent camera suggestion - enhanced with cinematography knowledge
bagel_camera = analysis_metadata["camera_setup"]
scene_type = detect_scene_type_from_analysis(analysis_metadata)
camera_config = _format_professional_camera_suggestion(bagel_camera, scene_type)
logger.info(f"Using BAGEL cinematography suggestion: {camera_config}")
else:
# Enhanced fallback with professional cinematography knowledge
scene_type = _detect_scene_from_description(description_part.lower())
camera_config = _get_enhanced_camera_config(scene_type, description_part.lower())
logger.info(f"Using enhanced cinematography configuration for {scene_type}")
# Add enhanced lighting with cinematography principles
lighting_enhancement = _get_cinematography_lighting_enhancement(description_part.lower(), camera_config, scene_type)
# Add style enhancement for multi-engine compatibility
style_enhancement = _get_style_enhancement(scene_type, description_part.lower())
# Build final prompt: Description + Camera + Lighting + Style
final_prompt = description_part + camera_config + lighting_enhancement + style_enhancement
# Clean up formatting
final_prompt = _clean_prompt_formatting(final_prompt)
return final_prompt
def _extract_description_only(prompt: str) -> str:
"""Extract only the description part, removing camera setup sections"""
# Remove CAMERA_SETUP section if present
if "CAMERA_SETUP:" in prompt:
parts = prompt.split("CAMERA_SETUP:")
description = parts[0].strip()
elif "2. CAMERA_SETUP" in prompt:
parts = prompt.split("2. CAMERA_SETUP")
description = parts[0].strip()
else:
description = prompt
# Remove "DESCRIPTION:" label if present
if description.startswith("DESCRIPTION:"):
description = description.replace("DESCRIPTION:", "").strip()
elif description.startswith("1. DESCRIPTION:"):
description = description.replace("1. DESCRIPTION:", "").strip()
# Clean up any remaining camera recommendations from the description
description = re.sub(r'For this type of scene.*?shooting style would be.*?\.', '', description, flags=re.DOTALL)
description = re.sub(r'I would recommend.*?aperture.*?\.', '', description, flags=re.DOTALL)
description = re.sub(r'Professional Context:.*?\.', '', description, flags=re.DOTALL)
description = re.sub(r'Cinematography context:.*?\.', '', description, flags=re.DOTALL)
# Remove numbered section residues
description = re.sub(r'\s*\d+\.\s*,?\s*$', '', description)
description = re.sub(r'\s*\d+\.\s*,?\s*', ' ', description)
return description.strip()
def _detect_scene_from_description(description_lower: str) -> str:
"""Enhanced scene detection from description with cinematography knowledge"""
scene_keywords = PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("scene_detection_keywords", {})
# Score each scene type
scene_scores = {}
for scene_type, keywords in scene_keywords.items():
score = sum(1 for keyword in keywords if keyword in description_lower)
if score > 0:
scene_scores[scene_type] = score
# Additional cinematography-specific detection
if any(term in description_lower for term in ["film", "movie", "cinematic", "dramatic lighting", "anamorphic"]):
scene_scores["cinematic"] = scene_scores.get("cinematic", 0) + 2
if any(term in description_lower for term in ["studio", "controlled lighting", "professional portrait"]):
scene_scores["portrait"] = scene_scores.get("portrait", 0) + 2
# Return highest scoring scene type
if scene_scores:
return max(scene_scores.items(), key=lambda x: x[1])[0]
else:
return "default"
def _format_professional_camera_suggestion(bagel_camera: str, scene_type: str) -> str:
"""Format BAGEL's camera suggestion with enhanced cinematography knowledge"""
try:
camera_text = bagel_camera.strip()
camera_text = re.sub(r'^CAMERA_SETUP:\s*', '', camera_text)
# Enhanced extraction patterns for cinema equipment
cinema_patterns = {
'camera': r'(ARRI [^,]+|RED [^,]+|Canon EOS [^,]+|Sony A[^,]+|Leica [^,]+|Hasselblad [^,]+|Phase One [^,]+)',
'lens': r'(\d+mm[^,]*(?:anamorphic)?[^,]*|[^,]*(?:anamorphic|telephoto|wide-angle)[^,]*)',
'aperture': r'(f/[\d.]+[^,]*)'
}
extracted_parts = []
for key, pattern in cinema_patterns.items():
match = re.search(pattern, camera_text, re.IGNORECASE)
if match:
extracted_parts.append(match.group(1).strip())
if extracted_parts:
camera_info = ', '.join(extracted_parts)
# Add scene-specific enhancement
if scene_type == "cinematic":
return f", Shot on {camera_info}, cinematic photography"
elif scene_type == "portrait":
return f", Shot on {camera_info}, professional portrait photography"
else:
return f", Shot on {camera_info}, professional photography"
else:
return _get_enhanced_camera_config(scene_type, camera_text.lower())
except Exception as e:
logger.warning(f"Failed to format professional camera suggestion: {e}")
return _get_enhanced_camera_config(scene_type, "")
def _get_enhanced_camera_config(scene_type: str, description_lower: str) -> str:
"""Get enhanced camera configuration with cinematography knowledge"""
# Enhanced camera configurations with cinema equipment
enhanced_configs = {
"cinematic": ", Shot on ARRI Alexa LF, 35mm anamorphic lens, cinematic photography",
"portrait": ", Shot on Canon EOS R5, 85mm f/1.4 lens at f/2.8, professional portrait photography",
"landscape": ", Shot on Phase One XT, 24-70mm f/4 lens at f/8, epic landscape photography",
"street": ", Shot on Leica M11, 35mm f/1.4 lens at f/2.8, documentary street photography",
"architectural": ", Shot on Canon EOS R5, 24-70mm f/2.8 lens at f/8, architectural photography",
"commercial": ", Shot on Hasselblad X2D 100C, 90mm f/2.5 lens, commercial photography"
}
# Use enhanced config if available, otherwise fall back to FLUX_RULES
if scene_type in enhanced_configs:
return enhanced_configs[scene_type]
elif scene_type in FLUX_RULES["camera_configs"]:
return FLUX_RULES["camera_configs"][scene_type]
else:
return FLUX_RULES["camera_configs"]["default"]
def _get_cinematography_lighting_enhancement(description_lower: str, camera_config: str, scene_type: str) -> str:
"""Enhanced lighting with cinematography principles"""
# Don't add lighting if already mentioned
if any(term in description_lower for term in ["lighting", "lit", "illuminated"]) or 'lighting' in camera_config.lower():
return ""
# Enhanced lighting based on scene type and cinematography knowledge
if scene_type == "cinematic":
if any(term in description_lower for term in ["dramatic", "moody", "dark"]):
return ", dramatic cinematic lighting with practical lights"
else:
return ", professional cinematic lighting"
elif scene_type == "portrait":
return ", professional studio lighting with subtle rim light"
elif "dramatic" in description_lower or "chaos" in description_lower:
return FLUX_RULES["lighting_enhancements"]["dramatic"]
else:
return FLUX_RULES["lighting_enhancements"]["default"]
def _get_style_enhancement(scene_type: str, description_lower: str) -> str:
"""Get style enhancement for multi-engine compatibility"""
style_enhancements = FLUX_RULES.get("style_enhancements", {})
if scene_type == "cinematic":
if "film grain" not in description_lower:
return ", " + style_enhancements.get("cinematic", "cinematic composition, film grain")
elif scene_type in ["portrait", "commercial"]:
return ", " + style_enhancements.get("photorealistic", "photorealistic, ultra-detailed")
elif "editorial" in description_lower:
return ", " + style_enhancements.get("editorial", "editorial photography style")
return ""
def _clean_prompt_formatting(prompt: str) -> str:
"""Clean up prompt formatting"""
if not prompt:
return ""
# Ensure it starts with capital letter
prompt = prompt.strip()
if prompt:
prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper()
# Clean up spaces and commas
prompt = re.sub(r'\s+', ' ', prompt)
prompt = re.sub(r',\s*,+', ',', prompt)
prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas
prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas
# Remove redundant periods
prompt = re.sub(r'\.+', '.', prompt)
return prompt.strip()
def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]:
"""
Calculate enhanced quality score with professional cinematography criteria
Args:
prompt: The prompt to score
analysis_data: Enhanced analysis data with cinematography context
Returns:
Tuple of (total_score, breakdown_dict)
"""
if not prompt:
return 0, {"prompt_quality": 0, "technical_details": 0, "professional_cinematography": 0, "multi_engine_optimization": 0}
breakdown = {}
# Enhanced Prompt Quality (0-25 points)
length_score = min(15, len(prompt) // 10) # Reward appropriate length
detail_score = min(10, len(prompt.split(',')) * 1.5) # Reward structured detail
breakdown["prompt_quality"] = int(length_score + detail_score)
# Technical Details with Cinematography Focus (0-25 points)
tech_score = 0
# Cinema equipment (higher scores for professional gear)
cinema_equipment = ['ARRI', 'RED', 'Canon EOS R', 'Sony A1', 'Leica', 'Hasselblad', 'Phase One']
for equipment in cinema_equipment:
if equipment.lower() in prompt.lower():
tech_score += 6
break
# Lens specifications
if re.search(r'\d+mm.*f/[\d.]+', prompt):
tech_score += 5
# Anamorphic and specialized lenses
if 'anamorphic' in prompt.lower():
tech_score += 4
# Professional terminology
tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'cinematic']
for keyword in tech_keywords:
if keyword in prompt.lower():
tech_score += 2
# Bonus for BAGEL cinematography suggestions
if analysis_data and analysis_data.get("has_camera_suggestion"):
tech_score += 8
breakdown["technical_details"] = min(25, tech_score)
# Professional Cinematography (0-25 points) - NEW CATEGORY
cinema_score = 0
# Professional lighting techniques
lighting_terms = ['cinematic lighting', 'dramatic lighting', 'studio lighting', 'rim light', 'practical lights']
cinema_score += sum(3 for term in lighting_terms if term in prompt.lower())
# Composition techniques
composition_terms = ['composition', 'framing', 'depth of field', 'bokeh', 'rule of thirds']
cinema_score += sum(2 for term in composition_terms if term in prompt.lower())
# Cinematography style elements
style_terms = ['film grain', 'anamorphic', 'telephoto compression', 'wide-angle']
cinema_score += sum(3 for term in style_terms if term in prompt.lower())
# Professional context bonus
if analysis_data and analysis_data.get("cinematography_context_applied"):
cinema_score += 5
breakdown["professional_cinematography"] = min(25, cinema_score)
# Multi-Engine Optimization (0-25 points)
optimization_score = 0
# Check for multi-engine compatible elements
multi_engine_terms = ['photorealistic', 'ultra-detailed', 'professional photography', 'cinematic']
optimization_score += sum(3 for term in multi_engine_terms if term in prompt.lower())
# Technical specifications for generation
if any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()):
optimization_score += 5
# Lighting configuration
if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()):
optimization_score += 4
# Style enhancements
if any(style in prompt for style in FLUX_RULES.get("style_enhancements", {}).values()):
optimization_score += 3
breakdown["multi_engine_optimization"] = min(25, optimization_score)
# Calculate total with enhanced weighting
total_score = sum(breakdown.values())
return total_score, breakdown
def calculate_professional_enhanced_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]:
"""
Enhanced scoring with professional cinematography criteria
Args:
prompt: The prompt to score
analysis_data: Analysis data with cinematography context
Returns:
Tuple of (total_score, breakdown_dict)
"""
# Use the enhanced scoring system
return calculate_prompt_score(prompt, analysis_data)
def get_score_grade(score: int) -> Dict[str, str]:
"""
Get grade information for a score
Args:
score: Numeric score
Returns:
Dictionary with grade and color information
"""
from config import SCORING_CONFIG
for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True):
if score >= threshold:
return grade_info
# Default to lowest grade
return SCORING_CONFIG["grade_thresholds"][0]
def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str:
"""
Format analysis data into a readable report with cinematography insights
Args:
analysis_data: Analysis results with cinematography context
processing_time: Time taken for processing
Returns:
Formatted markdown report
"""
model_used = analysis_data.get("model", "Unknown")
prompt_length = len(analysis_data.get("prompt", ""))
has_cinema_context = analysis_data.get("cinematography_context_applied", False)
scene_type = analysis_data.get("scene_type", "general")
report = f"""**🎬 PHRAMER AI ANALYSIS COMPLETE**
**Model:** {model_used} β€’ **Time:** {processing_time:.1f}s β€’ **Length:** {prompt_length} chars
**πŸ“Š CINEMATOGRAPHY ANALYSIS:**
**Scene Type:** {scene_type.replace('_', ' ').title()}
**Professional Context:** {'βœ… Applied' if has_cinema_context else '❌ Not Applied'}
**🎯 OPTIMIZATIONS APPLIED:**
βœ… Professional camera configuration
βœ… Cinematography lighting setup
βœ… Technical specifications
βœ… Multi-engine compatibility
βœ… Cinema-quality enhancement
**⚑ Powered by Pariente AI for MIA TV Series**"""
return report
def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]:
"""
Safely execute a function with error handling
Args:
func: Function to execute
*args: Function arguments
**kwargs: Function keyword arguments
Returns:
Tuple of (success: bool, result: Any)
"""
try:
result = func(*args, **kwargs)
return True, result
except Exception as e:
logger.error(f"Safe execution failed for {func.__name__}: {e}")
return False, str(e)
def truncate_text(text: str, max_length: int = 100) -> str:
"""
Truncate text to specified length with ellipsis
Args:
text: Text to truncate
max_length: Maximum length
Returns:
Truncated text
"""
if not text or len(text) <= max_length:
return text
return text[:max_length-3] + "..."
def enhance_prompt_with_cinematography_knowledge(original_prompt: str, scene_type: str = "default") -> str:
"""
Enhance prompt with professional cinematography knowledge
Args:
original_prompt: Base prompt text
scene_type: Detected scene type
Returns:
Enhanced prompt with cinematography context
"""
try:
# Import here to avoid circular imports
from professional_photography import enhance_flux_prompt_with_professional_knowledge
# Apply professional cinematography enhancement
enhanced = enhance_flux_prompt_with_professional_knowledge(original_prompt)
logger.info(f"Enhanced prompt with cinematography knowledge for {scene_type} scene")
return enhanced
except ImportError:
logger.warning("Professional photography module not available")
return original_prompt
except Exception as e:
logger.warning(f"Cinematography enhancement failed: {e}")
return original_prompt
# Export main functions
__all__ = [
"setup_logging",
"optimize_image",
"validate_image",
"clean_memory",
"apply_flux_rules",
"calculate_prompt_score",
"calculate_professional_enhanced_score",
"get_score_grade",
"format_analysis_report",
"safe_execute",
"truncate_text",
"enhance_prompt_with_cinematography_knowledge",
"detect_scene_type_from_analysis"
]