Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Utility functions for Phramer AI | |
By Pariente AI, for MIA TV Series | |
Enhanced with professional cinematography knowledge and intelligent token economy | |
""" | |
import re | |
import logging | |
import gc | |
from typing import Optional, Tuple, Dict, Any, List | |
from PIL import Image | |
import torch | |
import numpy as np | |
from config import PROCESSING_CONFIG, FLUX_RULES, PROFESSIONAL_PHOTOGRAPHY_CONFIG | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def setup_logging(level: str = "INFO") -> None: | |
"""Setup logging configuration""" | |
logging.basicConfig( | |
level=getattr(logging, level.upper()), | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
def optimize_image(image: Any) -> Optional[Image.Image]: | |
""" | |
Optimize image for processing | |
Args: | |
image: Input image (PIL, numpy array, or file path) | |
Returns: | |
Optimized PIL Image or None if failed | |
""" | |
if image is None: | |
return None | |
try: | |
# Convert to PIL Image if necessary | |
if isinstance(image, np.ndarray): | |
image = Image.fromarray(image) | |
elif isinstance(image, str): | |
image = Image.open(image) | |
elif not isinstance(image, Image.Image): | |
logger.error(f"Unsupported image type: {type(image)}") | |
return None | |
# Convert to RGB if necessary | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Resize if too large | |
max_size = PROCESSING_CONFIG["max_image_size"] | |
if image.size[0] > max_size or image.size[1] > max_size: | |
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
logger.info(f"Image resized to {image.size}") | |
return image | |
except Exception as e: | |
logger.error(f"Image optimization failed: {e}") | |
return None | |
def validate_image(image: Any) -> bool: | |
""" | |
Validate if image is processable | |
Args: | |
image: Input image to validate | |
Returns: | |
True if valid, False otherwise | |
""" | |
if image is None: | |
return False | |
try: | |
optimized = optimize_image(image) | |
return optimized is not None | |
except Exception: | |
return False | |
def clean_memory() -> None: | |
"""Clean up memory and GPU cache""" | |
try: | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
torch.cuda.synchronize() | |
logger.debug("Memory cleaned") | |
except Exception as e: | |
logger.warning(f"Memory cleanup failed: {e}") | |
def detect_scene_type_from_analysis(analysis_metadata: Dict[str, Any]) -> str: | |
"""Detect scene type from BAGEL analysis metadata""" | |
try: | |
# Check if BAGEL provided scene detection | |
if "scene_type" in analysis_metadata: | |
return analysis_metadata["scene_type"] | |
# Check camera setup for scene hints | |
camera_setup = analysis_metadata.get("camera_setup", "").lower() | |
if any(term in camera_setup for term in ["portrait", "85mm", "135mm"]): | |
return "portrait" | |
elif any(term in camera_setup for term in ["landscape", "wide", "24mm", "phase one"]): | |
return "landscape" | |
elif any(term in camera_setup for term in ["street", "35mm", "documentary", "leica"]): | |
return "street" | |
elif any(term in camera_setup for term in ["cinema", "arri", "red", "anamorphic"]): | |
return "cinematic" | |
elif any(term in camera_setup for term in ["architecture", "building", "tilt"]): | |
return "architectural" | |
return "default" | |
except Exception as e: | |
logger.warning(f"Scene type detection failed: {e}") | |
return "default" | |
def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str: | |
""" | |
Apply enhanced prompt optimization with cinematography knowledge and intelligent token economy | |
Args: | |
prompt: Raw prompt text from BAGEL analysis | |
analysis_metadata: Enhanced metadata with cinematography suggestions | |
Returns: | |
Optimized prompt with professional cinematography terms and efficient token usage | |
""" | |
if not prompt or not isinstance(prompt, str): | |
return "" | |
# Clean the prompt from unwanted elements | |
cleaned_prompt = prompt | |
for pattern in FLUX_RULES["remove_patterns"]: | |
cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) | |
# Extract description part only (remove CAMERA_SETUP section if present) | |
description_part = _extract_description_only(cleaned_prompt) | |
# NEW: Convert to generative language with cinematography angle detection | |
if PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("prompt_condensation", True): | |
description_part = _convert_to_cinematographic_language(description_part) | |
logger.info("Applied cinematographic language conversion") | |
# Check if BAGEL provided intelligent camera setup with cinematography context | |
camera_config = "" | |
scene_type = "default" | |
if analysis_metadata and analysis_metadata.get("has_camera_suggestion") and analysis_metadata.get("camera_setup"): | |
# Use BAGEL's intelligent camera suggestion - enhanced with cinematography knowledge | |
bagel_camera = analysis_metadata["camera_setup"] | |
scene_type = detect_scene_type_from_analysis(analysis_metadata) | |
camera_config = _format_professional_camera_suggestion(bagel_camera, scene_type) | |
logger.info(f"Using BAGEL cinematography suggestion: {camera_config}") | |
else: | |
# Enhanced fallback with professional cinematography knowledge | |
scene_type = _detect_scene_from_description(description_part.lower()) | |
camera_config = _get_enhanced_camera_config(scene_type, description_part.lower()) | |
logger.info(f"Using enhanced cinematography configuration for {scene_type}") | |
# Add enhanced lighting with cinematography principles | |
lighting_enhancement = _get_cinematography_lighting_enhancement(description_part.lower(), camera_config, scene_type) | |
# Add style enhancement for multi-engine compatibility | |
style_enhancement = _get_style_enhancement(scene_type, description_part.lower()) | |
# NEW: Smart keyword insertion with token economy | |
smart_keywords = _apply_smart_keyword_insertion(description_part, camera_config, scene_type) | |
# Build final prompt: Description + Camera + Lighting + Style + Smart Keywords | |
final_prompt = description_part + camera_config + lighting_enhancement + style_enhancement + smart_keywords | |
# NEW: Final length optimization with token economy | |
if PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("prompt_optimization", {}).get("max_length"): | |
final_prompt = _optimize_prompt_with_token_economy(final_prompt) | |
# Clean up formatting | |
final_prompt = _clean_prompt_formatting(final_prompt) | |
return final_prompt | |
def _extract_description_only(prompt: str) -> str: | |
"""Extract only the description part, removing camera setup sections""" | |
# Remove CAMERA_SETUP section if present | |
if "CAMERA_SETUP:" in prompt: | |
parts = prompt.split("CAMERA_SETUP:") | |
description = parts[0].strip() | |
elif "2. CAMERA_SETUP" in prompt: | |
parts = prompt.split("2. CAMERA_SETUP") | |
description = parts[0].strip() | |
else: | |
description = prompt | |
# Remove "DESCRIPTION:" label if present | |
if description.startswith("DESCRIPTION:"): | |
description = description.replace("DESCRIPTION:", "").strip() | |
elif description.startswith("1. DESCRIPTION:"): | |
description = description.replace("1. DESCRIPTION:", "").strip() | |
# Clean up any remaining camera recommendations from the description | |
description = re.sub(r'For this type of scene.*?shooting style would be.*?\.', '', description, flags=re.DOTALL) | |
description = re.sub(r'I would recommend.*?aperture.*?\.', '', description, flags=re.DOTALL) | |
description = re.sub(r'Professional Context:.*?\.', '', description, flags=re.DOTALL) | |
description = re.sub(r'Cinematography context:.*?\.', '', description, flags=re.DOTALL) | |
# Remove numbered section residues | |
description = re.sub(r'\s*\d+\.\s*,?\s*$', '', description) | |
description = re.sub(r'\s*\d+\.\s*,?\s*', ' ', description) | |
return description.strip() | |
def _detect_camera_angles(description: str) -> List[str]: | |
"""Detect camera angles and perspectives using professional cinematography knowledge""" | |
try: | |
angles_detected = [] | |
description_lower = description.lower() | |
# Low angle (contrapicado) detection | |
low_angle_indicators = [ | |
"looking up at", "from below", "upward angle", "towering", "looming", | |
"shot from ground level", "worm's eye", "low angle" | |
] | |
if any(indicator in description_lower for indicator in low_angle_indicators): | |
angles_detected.append("low-angle shot") | |
# High angle (picado) detection | |
high_angle_indicators = [ | |
"looking down", "from above", "overhead", "bird's eye", "aerial view", | |
"downward angle", "top-down", "high angle" | |
] | |
if any(indicator in description_lower for indicator in high_angle_indicators): | |
angles_detected.append("high-angle shot") | |
# Eye level detection | |
eye_level_indicators = [ | |
"eye level", "straight on", "direct view", "level with" | |
] | |
if any(indicator in description_lower for indicator in eye_level_indicators): | |
angles_detected.append("eye-level shot") | |
# Dutch angle detection | |
dutch_indicators = [ | |
"tilted", "angled", "diagonal", "off-kilter", "dutch angle" | |
] | |
if any(indicator in description_lower for indicator in dutch_indicators): | |
angles_detected.append("dutch angle") | |
# Perspective analysis for mixed angles | |
if ("foreground" in description_lower and "background" in description_lower): | |
if ("close" in description_lower or "prominent" in description_lower) and "blurred" in description_lower: | |
# Suggests foreground element shot from specific angle with background perspective | |
if not angles_detected: # Only add if no specific angle detected | |
angles_detected.append("shallow depth perspective") | |
logger.info(f"Camera angles detected: {angles_detected}") | |
return angles_detected | |
except Exception as e: | |
logger.warning(f"Camera angle detection failed: {e}") | |
return [] | |
def _convert_to_cinematographic_language(description: str) -> str: | |
"""Convert descriptive analysis to cinematographic prompt language with angle detection""" | |
try: | |
# First detect camera angles | |
camera_angles = _detect_camera_angles(description) | |
generative = description | |
# Remove descriptive introduction phrases | |
descriptive_intros = [ | |
r'This image (?:features|shows|depicts|presents|displays)', | |
r'The image (?:features|shows|depicts|presents|displays)', | |
r'This (?:photograph|picture|scene|composition) (?:features|shows|depicts)', | |
r'The (?:photograph|picture|scene|composition) (?:features|shows|depicts)', | |
r'This is (?:a|an) (?:image|photograph|picture) (?:of|showing)', | |
r'The setting (?:appears to be|is)', | |
r'The scene (?:appears to be|is|shows)', | |
] | |
for pattern in descriptive_intros: | |
generative = re.sub(pattern, '', generative, flags=re.IGNORECASE) | |
# Remove uncertainty and verbose connector phrases | |
verbose_phrases = [ | |
r'possibly (?:a|an) ', | |
r'appears to be (?:a|an) ', | |
r'seems to be (?:a|an) ', | |
r'might be (?:a|an) ', | |
r'could be (?:a|an) ', | |
r'suggests (?:a|an) ', | |
r'indicating (?:a|an) ', | |
r'(?:possibly|apparently|seemingly|likely)', | |
r'which (?:is|are|creates|adds)', | |
r'(?:In the background|In the foreground), (?:there are|there is)', | |
r'(?:The background|The foreground) (?:features|shows|contains)', | |
r'(?:There are|There is) [^,]+ (?:in the background|in the foreground)', | |
r'The overall (?:setting|atmosphere|mood) (?:suggests|indicates)', | |
] | |
for pattern in verbose_phrases: | |
generative = re.sub(pattern, '', generative, flags=re.IGNORECASE) | |
# Convert spatial relationships to cinematographic terms | |
spatial_conversions = [ | |
# Background/foreground to cinematographic terms | |
(r'prominently displayed in (?:the )?foreground', 'foreground focus'), | |
(r'in (?:the )?foreground', 'foreground'), | |
(r'in (?:the )?background', 'background'), | |
(r'blurred (?:figures|people|objects)', 'bokeh blur'), | |
(r'out of focus', 'soft focus'), | |
# Convert descriptive structure to noun phrases | |
(r'(?:close-up|medium shot|wide shot) of (?:a|an|the) ', r'close-up '), | |
(r'(?:a|an|the) (\w+)', r'\1'), | |
# Remove excessive connecting words | |
(r'(?:, and|, with|, featuring)', ','), | |
# Simplify location descriptions | |
(r'on (?:a|an|the) ', r'on '), | |
(r'in (?:a|an|the) ', r'in '), | |
] | |
for pattern, replacement in spatial_conversions: | |
generative = re.sub(pattern, replacement, generative, flags=re.IGNORECASE) | |
# Convert action descriptions to present participles | |
action_conversions = [ | |
(r'(\w+) (?:are|is) walking', r'\1 walking'), | |
(r'(\w+) (?:are|is) standing', r'\1 standing'), | |
(r'(\w+) (?:are|is) sitting', r'\1 sitting'), | |
(r'people (?:are|is) out of focus', r'blurred people'), | |
] | |
for pattern, replacement in action_conversions: | |
generative = re.sub(pattern, replacement, generative, flags=re.IGNORECASE) | |
# Add detected camera angles at the beginning | |
if camera_angles: | |
angle_prefix = ", ".join(camera_angles) | |
generative = f"{angle_prefix}, {generative}" | |
# Clean up extra spaces and punctuation | |
generative = re.sub(r'\s+', ' ', generative) | |
generative = re.sub(r'^\s*,\s*', '', generative) # Remove leading commas | |
generative = re.sub(r'\s*,\s*,+', ',', generative) # Remove double commas | |
generative = re.sub(r'\.+', '.', generative) # Remove multiple periods | |
# Ensure it starts with a capital letter | |
generative = generative.strip() | |
if generative: | |
generative = generative[0].upper() + generative[1:] if len(generative) > 1 else generative.upper() | |
logger.info(f"Cinematographic conversion: angles={len(camera_angles)}, {len(description)} β {len(generative)} chars") | |
return generative | |
except Exception as e: | |
logger.warning(f"Cinematographic language conversion failed: {e}") | |
return description | |
def _apply_smart_keyword_insertion(description: str, camera_config: str, scene_type: str) -> str: | |
"""Smart keyword insertion with token economy - avoid redundancy""" | |
try: | |
keywords = [] | |
# Token Economy Rule 1: If camera specs exist, skip "photorealistic" keywords | |
has_camera_specs = bool(re.search(r'(?:Canon|Sony|Leica|ARRI|RED|Hasselblad|Phase One)', camera_config)) | |
has_lens_specs = bool(re.search(r'\d+mm.*f/[\d.]+', camera_config)) | |
# Only add quality keywords if NO technical specs present | |
if not (has_camera_specs and has_lens_specs): | |
quality_keywords = FLUX_RULES.get("mandatory_keywords", {}).get("quality", []) | |
keywords.extend(quality_keywords[:2]) # Limit to 2 quality keywords max | |
logger.info("Added fallback quality keywords (no camera specs detected)") | |
else: | |
logger.info("Skipped redundant quality keywords (camera specs present)") | |
# Token Economy Rule 2: Scene-specific keywords only if they add value | |
style_by_scene = FLUX_RULES.get("mandatory_keywords", {}).get("style_by_scene", {}) | |
if scene_type in style_by_scene: | |
scene_keywords = style_by_scene[scene_type] | |
# Check if scene keywords are already implied by camera config or description | |
for keyword in scene_keywords: | |
if keyword.lower() not in camera_config.lower() and keyword.lower() not in description.lower(): | |
keywords.append(keyword) | |
# Token Economy Rule 3: Technical keywords only if not redundant | |
technical_keywords = FLUX_RULES.get("mandatory_keywords", {}).get("technical", []) | |
for tech_keyword in technical_keywords: | |
# Skip "professional photography" if camera specs already indicate professional level | |
if tech_keyword == "professional photography" and has_camera_specs: | |
continue | |
# Skip "high resolution" if camera specs include resolution indicators | |
if tech_keyword == "high resolution" and has_camera_specs: | |
continue | |
keywords.append(tech_keyword) | |
# Remove duplicates while preserving order | |
unique_keywords = [] | |
for keyword in keywords: | |
if keyword not in unique_keywords: | |
unique_keywords.append(keyword) | |
if unique_keywords: | |
result = ", " + ", ".join(unique_keywords) | |
logger.info(f"Smart keywords applied: {unique_keywords}") | |
return result | |
else: | |
logger.info("No additional keywords needed (all redundant)") | |
return "" | |
except Exception as e: | |
logger.warning(f"Smart keyword insertion failed: {e}") | |
return "" | |
def _optimize_prompt_with_token_economy(prompt: str) -> str: | |
"""Optimize prompt length with intelligent token economy""" | |
try: | |
max_words = PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("prompt_optimization", {}).get("max_length", 150) | |
words = prompt.split() | |
if len(words) <= max_words: | |
return prompt | |
# Priority preservation order for token economy | |
essential_patterns = [ | |
# 1. Camera angles (highest priority) | |
r'(?:low-angle|high-angle|eye-level|dutch angle|bird\'s eye|worm\'s eye) shot', | |
# 2. Camera and lens specs | |
r'(?:Canon|Sony|Leica|ARRI|RED|Hasselblad|Phase One) [^,]+', | |
r'\d+mm[^,]*f/[\d.]+[^,]*', | |
r'ISO \d+', | |
# 3. Core subject and composition | |
r'(?:close-up|medium shot|wide shot|shallow depth)', | |
r'(?:foreground|background|bokeh)', | |
# 4. Scene-specific technical terms | |
r'(?:cinematic|anamorphic|telephoto|wide-angle)', | |
] | |
# Extract essential parts first | |
essential_parts = [] | |
remaining_text = prompt | |
for pattern in essential_patterns: | |
matches = re.findall(pattern, remaining_text, re.IGNORECASE) | |
for match in matches: | |
if match not in essential_parts: | |
essential_parts.append(match) | |
# Remove from remaining text to avoid duplication | |
remaining_text = re.sub(re.escape(match), '', remaining_text, count=1, flags=re.IGNORECASE) | |
# Add essential parts to start | |
optimized_words = [] | |
for part in essential_parts: | |
optimized_words.extend(part.split()) | |
# Fill remaining space with most important remaining words | |
remaining_words = [w for w in remaining_text.split() if w.strip() and w not in optimized_words] | |
remaining_space = max_words - len(optimized_words) | |
if remaining_space > 0: | |
optimized_words.extend(remaining_words[:remaining_space]) | |
optimized = " ".join(optimized_words[:max_words]) | |
logger.info(f"Token economy optimization: {len(words)} β {len(optimized_words)} words, preserved {len(essential_parts)} essential elements") | |
return optimized | |
except Exception as e: | |
logger.warning(f"Token economy optimization failed: {e}") | |
return prompt | |
def _detect_scene_from_description(description_lower: str) -> str: | |
"""Enhanced scene detection from description with cinematography knowledge""" | |
scene_keywords = PROFESSIONAL_PHOTOGRAPHY_CONFIG.get("scene_detection_keywords", {}) | |
# Score each scene type | |
scene_scores = {} | |
for scene_type, keywords in scene_keywords.items(): | |
score = sum(1 for keyword in keywords if keyword in description_lower) | |
if score > 0: | |
scene_scores[scene_type] = score | |
# Additional cinematography-specific detection | |
if any(term in description_lower for term in ["film", "movie", "cinematic", "dramatic lighting", "anamorphic"]): | |
scene_scores["cinematic"] = scene_scores.get("cinematic", 0) + 2 | |
if any(term in description_lower for term in ["studio", "controlled lighting", "professional portrait"]): | |
scene_scores["portrait"] = scene_scores.get("portrait", 0) + 2 | |
# Return highest scoring scene type | |
if scene_scores: | |
return max(scene_scores.items(), key=lambda x: x[1])[0] | |
else: | |
return "default" | |
def _format_professional_camera_suggestion(bagel_camera: str, scene_type: str) -> str: | |
"""Format BAGEL's camera suggestion with enhanced cinematography knowledge and fix formatting errors""" | |
try: | |
camera_text = bagel_camera.strip() | |
camera_text = re.sub(r'^CAMERA_SETUP:\s*', '', camera_text) | |
# Enhanced extraction patterns for cinema equipment | |
cinema_patterns = { | |
'camera': r'(ARRI [^,]+|RED [^,]+|Canon EOS [^,]+|Sony A[^,]+|Leica [^,]+|Hasselblad [^,]+|Phase One [^,]+)', | |
'lens': r'(\d+mm[^,]*(?:anamorphic)?[^,]*)', | |
'aperture': r'(f/[\d.]+)' | |
} | |
extracted_parts = [] | |
camera_model = None | |
lens_spec = None | |
aperture_spec = None | |
# Extract camera | |
camera_match = re.search(cinema_patterns['camera'], camera_text, re.IGNORECASE) | |
if camera_match: | |
camera_model = camera_match.group(1).strip() | |
# Extract lens | |
lens_match = re.search(cinema_patterns['lens'], camera_text, re.IGNORECASE) | |
if lens_match: | |
lens_spec = lens_match.group(1).strip() | |
# Extract aperture | |
aperture_match = re.search(cinema_patterns['aperture'], camera_text, re.IGNORECASE) | |
if aperture_match: | |
aperture_spec = aperture_match.group(1).strip() | |
# Build proper camera setup with all technical specs | |
if camera_model and lens_spec: | |
# Fix the "with, 35mm" error by proper formatting | |
camera_setup = f"{camera_model}, {lens_spec}" | |
# Add aperture if found | |
if aperture_spec: | |
if 'f/' not in lens_spec: # Don't duplicate aperture | |
camera_setup += f" at {aperture_spec}" | |
# Add ISO and composition based on scene type | |
enhanced_config = _get_enhanced_camera_config(scene_type, "") | |
# Extract ISO and composition from enhanced config | |
iso_match = re.search(r'ISO \d+', enhanced_config) | |
composition_match = re.search(r'(rule of thirds|leading lines|symmetrical|centered|hyperfocal distance)[^,]*', enhanced_config) | |
if iso_match: | |
camera_setup += f", {iso_match.group()}" | |
if composition_match: | |
camera_setup += f", {composition_match.group()}" | |
# Scene-specific enhancement with token economy | |
if scene_type == "cinematic": | |
result = f", Shot on {camera_setup}" # Skip redundant "cinematic photography" | |
elif scene_type == "portrait": | |
result = f", Shot on {camera_setup}" # Skip redundant "professional portrait photography" | |
else: | |
result = f", Shot on {camera_setup}" | |
logger.info(f"Formatted camera setup with token economy: {result}") | |
return result | |
else: | |
# Fallback to enhanced config if parsing fails | |
return _get_enhanced_camera_config(scene_type, camera_text.lower()) | |
except Exception as e: | |
logger.warning(f"Failed to format professional camera suggestion: {e}") | |
return _get_enhanced_camera_config(scene_type, "") | |
def _get_enhanced_camera_config(scene_type: str, description_lower: str) -> str: | |
"""Get enhanced camera configuration with cinematography knowledge""" | |
# Enhanced camera configurations with cinema equipment | |
enhanced_configs = { | |
"cinematic": ", Shot on ARRI Alexa LF, 35mm anamorphic lens at f/2.8, ISO 400", | |
"portrait": ", Shot on Canon EOS R5, 85mm f/1.4 lens at f/2.8, ISO 200, rule of thirds", | |
"landscape": ", Shot on Phase One XT, 24-70mm f/4 lens at f/8, ISO 100, hyperfocal distance", | |
"street": ", Shot on Leica M11, 35mm f/1.4 lens at f/2.8, ISO 800", | |
"architectural": ", Shot on Canon EOS R5, 24-70mm f/2.8 lens at f/8, ISO 100, symmetrical composition", | |
"commercial": ", Shot on Hasselblad X2D 100C, 90mm f/2.5 lens at f/4, ISO 100" | |
} | |
# Use enhanced config if available, otherwise fall back to FLUX_RULES | |
if scene_type in enhanced_configs: | |
return enhanced_configs[scene_type] | |
elif scene_type in FLUX_RULES["camera_configs"]: | |
return FLUX_RULES["camera_configs"][scene_type] | |
else: | |
return FLUX_RULES["camera_configs"]["default"] | |
def _get_cinematography_lighting_enhancement(description_lower: str, camera_config: str, scene_type: str) -> str: | |
"""Enhanced lighting with cinematography principles""" | |
# Don't add lighting if already mentioned | |
if any(term in description_lower for term in ["lighting", "lit", "illuminated"]) or 'lighting' in camera_config.lower(): | |
return "" | |
# Enhanced lighting based on scene type and cinematography knowledge | |
if scene_type == "cinematic": | |
if any(term in description_lower for term in ["dramatic", "moody", "dark"]): | |
return ", dramatic lighting" | |
else: | |
return ", cinematic lighting" | |
elif scene_type == "portrait": | |
return ", studio lighting" | |
elif "dramatic" in description_lower or "chaos" in description_lower: | |
return ", dramatic lighting" | |
else: | |
return "" # Skip redundant lighting terms | |
def _get_style_enhancement(scene_type: str, description_lower: str) -> str: | |
"""Get style enhancement for multi-engine compatibility with token economy""" | |
# Token economy: only add style if it adds unique value | |
if scene_type == "cinematic": | |
if "film grain" not in description_lower: | |
return ", film grain" | |
elif scene_type == "architectural": | |
return ", clean lines" | |
return "" # Skip redundant style terms | |
def _clean_prompt_formatting(prompt: str) -> str: | |
"""Clean up prompt formatting""" | |
if not prompt: | |
return "" | |
# Ensure it starts with capital letter | |
prompt = prompt.strip() | |
if prompt: | |
prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper() | |
# Clean up spaces and commas | |
prompt = re.sub(r'\s+', ' ', prompt) | |
prompt = re.sub(r',\s*,+', ',', prompt) | |
prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas | |
prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas | |
# Remove redundant periods | |
prompt = re.sub(r'\.+', '.', prompt) | |
return prompt.strip() | |
def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
""" | |
Calculate enhanced quality score with professional cinematography criteria | |
Args: | |
prompt: The prompt to score | |
analysis_data: Enhanced analysis data with cinematography context | |
Returns: | |
Tuple of (total_score, breakdown_dict) | |
""" | |
if not prompt: | |
return 0, {"prompt_quality": 0, "technical_details": 0, "professional_cinematography": 0, "multi_engine_optimization": 0} | |
breakdown = {} | |
# Enhanced Prompt Quality (0-25 points) | |
length_score = min(15, len(prompt) // 10) # Reward appropriate length | |
detail_score = min(10, len(prompt.split(',')) * 1.5) # Reward structured detail | |
breakdown["prompt_quality"] = int(length_score + detail_score) | |
# Technical Details with Cinematography Focus (0-25 points) | |
tech_score = 0 | |
# Cinema equipment (higher scores for professional gear) | |
cinema_equipment = ['ARRI', 'RED', 'Canon EOS R', 'Sony A1', 'Leica', 'Hasselblad', 'Phase One'] | |
for equipment in cinema_equipment: | |
if equipment.lower() in prompt.lower(): | |
tech_score += 6 | |
break | |
# Lens specifications | |
if re.search(r'\d+mm.*f/[\d.]+', prompt): | |
tech_score += 5 | |
# Camera angles (NEW - high value) | |
angle_terms = ['low-angle shot', 'high-angle shot', 'eye-level shot', 'dutch angle', 'bird\'s eye', 'worm\'s eye'] | |
tech_score += sum(4 for term in angle_terms if term in prompt.lower()) | |
# Anamorphic and specialized lenses | |
if 'anamorphic' in prompt.lower(): | |
tech_score += 4 | |
# Professional terminology | |
tech_keywords = ['shot on', 'lens', 'cinematography', 'lighting'] | |
for keyword in tech_keywords: | |
if keyword in prompt.lower(): | |
tech_score += 2 | |
# Bonus for BAGEL cinematography suggestions | |
if analysis_data and analysis_data.get("has_camera_suggestion"): | |
tech_score += 8 | |
breakdown["technical_details"] = min(25, tech_score) | |
# Professional Cinematography (0-25 points) - Enhanced with angle detection | |
cinema_score = 0 | |
# Camera angles (high value for professional cinematography) | |
angle_terms = ['low-angle', 'high-angle', 'eye-level', 'dutch angle', 'bird\'s eye', 'worm\'s eye'] | |
cinema_score += sum(5 for term in angle_terms if term in prompt.lower()) | |
# Professional lighting techniques | |
lighting_terms = ['cinematic lighting', 'dramatic lighting', 'studio lighting', 'rim light', 'practical lights'] | |
cinema_score += sum(3 for term in lighting_terms if term in prompt.lower()) | |
# Composition techniques | |
composition_terms = ['composition', 'framing', 'depth of field', 'bokeh', 'rule of thirds', 'foreground', 'background'] | |
cinema_score += sum(2 for term in composition_terms if term in prompt.lower()) | |
# Cinematography style elements | |
style_terms = ['film grain', 'anamorphic', 'telephoto compression', 'wide-angle', 'shallow depth'] | |
cinema_score += sum(3 for term in style_terms if term in prompt.lower()) | |
# Professional context bonus | |
if analysis_data and analysis_data.get("cinematography_context_applied"): | |
cinema_score += 5 | |
breakdown["professional_cinematography"] = min(25, cinema_score) | |
# Multi-Engine Optimization (0-25 points) - Token economy aware | |
optimization_score = 0 | |
# Check for technical specifications (more valuable than generic keywords) | |
if re.search(r'(?:Canon|Sony|Leica|ARRI|RED|Hasselblad|Phase One)', prompt): | |
optimization_score += 8 # Higher score for actual camera specs | |
if re.search(r'\d+mm.*f/[\d.]+.*ISO \d+', prompt): | |
optimization_score += 7 # Complete technical specs | |
# Token economy bonus: penalize redundant keywords | |
redundant_keywords = ['photorealistic', 'ultra-detailed', 'professional photography'] | |
has_camera_specs = bool(re.search(r'(?:Canon|Sony|Leica|ARRI|RED)', prompt)) | |
if has_camera_specs: | |
# Bonus for NOT having redundant keywords when camera specs present | |
redundant_count = sum(1 for keyword in redundant_keywords if keyword in prompt.lower()) | |
optimization_score += max(0, 5 - redundant_count * 2) # Penalty for redundancy | |
else: | |
# If no camera specs, quality keywords are valuable | |
quality_keywords = sum(1 for keyword in redundant_keywords if keyword in prompt.lower()) | |
optimization_score += min(5, quality_keywords * 2) | |
# Scene-specific optimization | |
if any(style in prompt for style in FLUX_RULES.get("style_enhancements", {}).values()): | |
optimization_score += 3 | |
# Length efficiency bonus | |
word_count = len(prompt.split()) | |
if word_count <= 120: # Reward conciseness | |
optimization_score += 2 | |
breakdown["multi_engine_optimization"] = min(25, optimization_score) | |
# Calculate total with enhanced weighting | |
total_score = sum(breakdown.values()) | |
return total_score, breakdown | |
def calculate_professional_enhanced_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
""" | |
Enhanced scoring with professional cinematography criteria | |
Args: | |
prompt: The prompt to score | |
analysis_data: Analysis data with cinematography context | |
Returns: | |
Tuple of (total_score, breakdown_dict) | |
""" | |
# Use the enhanced scoring system | |
return calculate_prompt_score(prompt, analysis_data) | |
def get_score_grade(score: int) -> Dict[str, str]: | |
""" | |
Get grade information for a score | |
Args: | |
score: Numeric score | |
Returns: | |
Dictionary with grade and color information | |
""" | |
from config import SCORING_CONFIG | |
for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): | |
if score >= threshold: | |
return grade_info | |
# Default to lowest grade | |
return SCORING_CONFIG["grade_thresholds"][0] | |
def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: | |
""" | |
Format analysis data into a readable report with cinematography insights | |
Args: | |
analysis_data: Analysis results with cinematography context | |
processing_time: Time taken for processing | |
Returns: | |
Formatted markdown report | |
""" | |
model_used = analysis_data.get("model", "Unknown") | |
prompt_length = len(analysis_data.get("prompt", "")) | |
has_cinema_context = analysis_data.get("cinematography_context_applied", False) | |
scene_type = analysis_data.get("scene_type", "general") | |
report = f"""**π¬ PHRAMER AI ANALYSIS COMPLETE** | |
**Model:** {model_used} β’ **Time:** {processing_time:.1f}s β’ **Length:** {prompt_length} chars | |
**π CINEMATOGRAPHY ANALYSIS:** | |
**Scene Type:** {scene_type.replace('_', ' ').title()} | |
**Professional Context:** {'β Applied' if has_cinema_context else 'β Not Applied'} | |
**π― OPTIMIZATIONS APPLIED:** | |
β Camera angle detection | |
β Professional camera configuration | |
β Cinematography lighting setup | |
β Token economy optimization | |
β Multi-engine compatibility | |
β Redundancy elimination | |
**β‘ Powered by Pariente AI for MIA TV Series**""" | |
return report | |
def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: | |
""" | |
Safely execute a function with error handling | |
Args: | |
func: Function to execute | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
Tuple of (success: bool, result: Any) | |
""" | |
try: | |
result = func(*args, **kwargs) | |
return True, result | |
except Exception as e: | |
logger.error(f"Safe execution failed for {func.__name__}: {e}") | |
return False, str(e) | |
def truncate_text(text: str, max_length: int = 100) -> str: | |
""" | |
Truncate text to specified length with ellipsis | |
Args: | |
text: Text to truncate | |
max_length: Maximum length | |
Returns: | |
Truncated text | |
""" | |
if not text or len(text) <= max_length: | |
return text | |
return text[:max_length-3] + "..." | |
def enhance_prompt_with_cinematography_knowledge(original_prompt: str, scene_type: str = "default") -> str: | |
""" | |
Enhance prompt with professional cinematography knowledge | |
Args: | |
original_prompt: Base prompt text | |
scene_type: Detected scene type | |
Returns: | |
Enhanced prompt with cinematography context | |
""" | |
try: | |
# Import here to avoid circular imports | |
from professional_photography import enhance_flux_prompt_with_professional_knowledge | |
# Apply professional cinematography enhancement | |
enhanced = enhance_flux_prompt_with_professional_knowledge(original_prompt) | |
logger.info(f"Enhanced prompt with cinematography knowledge for {scene_type} scene") | |
return enhanced | |
except ImportError: | |
logger.warning("Professional photography module not available") | |
return original_prompt | |
except Exception as e: | |
logger.warning(f"Cinematography enhancement failed: {e}") | |
return original_prompt | |
# Export main functions | |
__all__ = [ | |
"setup_logging", | |
"optimize_image", | |
"validate_image", | |
"clean_memory", | |
"apply_flux_rules", | |
"calculate_prompt_score", | |
"calculate_professional_enhanced_score", | |
"get_score_grade", | |
"format_analysis_report", | |
"safe_execute", | |
"truncate_text", | |
"enhance_prompt_with_cinematography_knowledge", | |
"detect_scene_type_from_analysis" | |
] |