Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Utility functions for Phramer AI | |
By Pariente AI, for MIA TV Series | |
Enhanced with professional cinematography knowledge and intelligent token economy | |
""" | |
import re | |
import logging | |
import gc | |
from typing import Optional, Tuple, Dict, Any, List | |
from PIL import Image | |
import torch | |
import numpy as np | |
from config import PROCESSING_CONFIG, FLUX_RULES, PROFESSIONAL_PHOTOGRAPHY_CONFIG | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def setup_logging(level: str = "INFO") -> None: | |
"""Setup logging configuration""" | |
logging.basicConfig( | |
level=getattr(logging, level.upper()), | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
def optimize_image(image: Any) -> Optional[Image.Image]: | |
""" | |
Optimize image for processing | |
Args: | |
image: Input image (PIL, numpy array, or file path) | |
Returns: | |
Optimized PIL Image or None if failed | |
""" | |
if image is None: | |
return None | |
try: | |
# Convert to PIL Image if necessary | |
if isinstance(image, np.ndarray): | |
image = Image.fromarray(image) | |
elif isinstance(image, str): | |
image = Image.open(image) | |
elif not isinstance(image, Image.Image): | |
logger.error(f"Unsupported image type: {type(image)}") | |
return None | |
# Convert to RGB if necessary | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Resize if too large | |
max_size = PROCESSING_CONFIG["max_image_size"] | |
if image.size[0] > max_size or image.size[1] > max_size: | |
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
logger.info(f"Image resized to {image.size}") | |
return image | |
except Exception as e: | |
logger.error(f"Image optimization failed: {e}") | |
return None | |
def validate_image(image: Any) -> bool: | |
""" | |
Validate if image is processable | |
Args: | |
image: Input image to validate | |
Returns: | |
True if valid, False otherwise | |
""" | |
if image is None: | |
return False | |
try: | |
optimized = optimize_image(image) | |
return optimized is not None | |
except Exception: | |
return False | |
def clean_memory() -> None: | |
"""Clean up memory and GPU cache""" | |
try: | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
torch.cuda.synchronize() | |
logger.debug("Memory cleaned") | |
except Exception as e: | |
logger.warning(f"Memory cleanup failed: {e}") | |
def detect_scene_type_from_analysis(analysis_metadata: Dict[str, Any]) -> str: | |
"""Detect scene type from BAGEL analysis metadata""" | |
try: | |
# Check if BAGEL provided scene detection | |
if "scene_type" in analysis_metadata: | |
return analysis_metadata["scene_type"] | |
# Check camera setup for scene hints | |
camera_setup = analysis_metadata.get("camera_setup", "").lower() | |
if any(term in camera_setup for term in ["portrait", "85mm", "135mm"]): | |
return "portrait" | |
elif any(term in camera_setup for term in ["landscape", "wide", "24mm", "phase one"]): | |
return "landscape" | |
elif any(term in camera_setup for term in ["street", "35mm", "documentary", "leica"]): | |
return "street" | |
elif any(term in camera_setup for term in ["cinema", "arri", "red", "anamorphic"]): | |
return "cinematic" | |
elif any(term in camera_setup for term in ["architecture", "building", "tilt"]): | |
return "architectural" | |
return "default" | |
except Exception as e: | |
logger.warning(f"Scene type detection failed: {e}") | |
return "default" | |
def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str: | |
""" | |
Apply enhanced prompt optimization with cinematography knowledge and intelligent token economy | |
Args: | |
prompt: Raw prompt text from BAGEL analysis | |
analysis_metadata: Enhanced metadata with cinematography suggestions | |
Returns: | |
Optimized prompt with professional cinematography terms and efficient token usage | |
""" | |
if not prompt or not isinstance(prompt, str): | |
return "" | |
try: | |
# Step 1: Extract and clean the core description | |
core_description = _extract_clean_description(prompt) | |
if not core_description: | |
return "Professional photograph with technical excellence" | |
# Step 2: Get camera configuration | |
camera_setup = _get_camera_setup(analysis_metadata, core_description) | |
# Step 3: Get essential style keywords | |
style_keywords = _get_essential_keywords(core_description, camera_setup, analysis_metadata) | |
# Step 4: Build final optimized prompt | |
final_prompt = _build_optimized_prompt(core_description, camera_setup, style_keywords) | |
logger.info(f"Prompt optimized: {len(prompt)} β {len(final_prompt)} chars") | |
return final_prompt | |
except Exception as e: | |
logger.error(f"Prompt optimization failed: {e}") | |
return _create_fallback_prompt(prompt) | |
def _extract_clean_description(prompt: str) -> str: | |
"""Extract and clean the core description from BAGEL output""" | |
try: | |
# Remove CAMERA_SETUP section | |
if "CAMERA_SETUP:" in prompt: | |
description = prompt.split("CAMERA_SETUP:")[0].strip() | |
elif "2. CAMERA_SETUP" in prompt: | |
description = prompt.split("2. CAMERA_SETUP")[0].strip() | |
else: | |
description = prompt | |
# Remove section headers | |
description = re.sub(r'^(DESCRIPTION:|1\.\s*DESCRIPTION:)\s*', '', description, flags=re.IGNORECASE) | |
# Remove verbose introduction phrases | |
remove_patterns = [ | |
r'^This image (?:features|shows|depicts|presents|captures)', | |
r'^The image (?:features|shows|depicts|presents|captures)', | |
r'^This (?:photograph|picture|scene) (?:features|shows|depicts)', | |
r'^(?:In this image,?|Looking at this image,?)', | |
r'(?:possibly|apparently|seemingly|appears to be|seems to be)', | |
] | |
for pattern in remove_patterns: | |
description = re.sub(pattern, '', description, flags=re.IGNORECASE) | |
# Convert to concise, direct language | |
description = _convert_to_direct_language(description) | |
# Clean up formatting | |
description = re.sub(r'\s+', ' ', description).strip() | |
# Limit length for efficiency | |
if len(description) > 200: | |
sentences = re.split(r'[.!?]', description) | |
description = sentences[0] if sentences else description[:200] | |
return description.strip() | |
except Exception as e: | |
logger.warning(f"Description extraction failed: {e}") | |
return prompt[:100] if prompt else "" | |
def _convert_to_direct_language(text: str) -> str: | |
"""Convert verbose descriptive text to direct, concise language""" | |
try: | |
# Direct conversions for common verbose phrases | |
conversions = [ | |
# Subject identification | |
(r'a (?:person|individual|figure|man|woman) (?:who is|that is)', r'person'), | |
(r' (?:who is|that is) (?:wearing|dressed in)', r' wearing'), | |
(r' (?:who appears to be|that appears to be)', r''), | |
# Location simplification | |
(r'(?:what appears to be|what seems to be) (?:a|an)', r''), | |
(r'in (?:what looks like|what appears to be) (?:a|an)', r'in'), | |
(r'(?:standing|sitting|positioned) in (?:the middle of|the center of)', r'in'), | |
# Action simplification | |
(r'(?:is|are) (?:currently|presently) (?:engaged in|performing)', r''), | |
(r'(?:can be seen|is visible|are visible)', r''), | |
# Background simplification | |
(r'(?:In the background|Behind (?:him|her|them)),? (?:there (?:is|are)|we can see)', r'Background:'), | |
(r'The background (?:features|shows|contains)', r'Background:'), | |
# Remove filler words | |
(r'\b(?:quite|rather|somewhat|fairly|very|extremely)\b', r''), | |
(r'\b(?:overall|generally|typically|usually)\b', r''), | |
] | |
result = text | |
for pattern, replacement in conversions: | |
result = re.sub(pattern, replacement, result, flags=re.IGNORECASE) | |
# Clean up extra spaces and punctuation | |
result = re.sub(r'\s+', ' ', result) | |
result = re.sub(r'\s*,\s*,+', ',', result) | |
result = re.sub(r'^\s*,\s*', '', result) | |
return result.strip() | |
except Exception as e: | |
logger.warning(f"Language conversion failed: {e}") | |
return text | |
def _get_camera_setup(analysis_metadata: Optional[Dict[str, Any]], description: str) -> str: | |
"""Get camera setup configuration""" | |
try: | |
# Check if BAGEL provided camera setup | |
if analysis_metadata and analysis_metadata.get("has_camera_suggestion"): | |
camera_setup = analysis_metadata.get("camera_setup", "") | |
if camera_setup and len(camera_setup) > 10: | |
return _format_camera_setup(camera_setup) | |
# Detect scene type and provide appropriate camera setup | |
scene_type = _detect_scene_from_content(description) | |
return _get_scene_camera_setup(scene_type) | |
except Exception as e: | |
logger.warning(f"Camera setup detection failed: {e}") | |
return "shot on professional camera" | |
def _format_camera_setup(raw_setup: str) -> str: | |
"""Format camera setup into clean, concise format""" | |
try: | |
# Extract camera model | |
camera_patterns = [ | |
r'(Canon EOS R\d+)', | |
r'(Sony A\d+[^\s,]*)', | |
r'(Leica [^\s,]+)', | |
r'(Phase One [^\s,]+)', | |
r'(Hasselblad [^\s,]+)', | |
r'(ARRI [^\s,]+)', | |
r'(RED [^\s,]+)' | |
] | |
camera = None | |
for pattern in camera_patterns: | |
match = re.search(pattern, raw_setup, re.IGNORECASE) | |
if match: | |
camera = match.group(1) | |
break | |
# Extract lens info | |
lens_pattern = r'(\d+mm[^,]*f/[\d.]+[^,]*)' | |
lens_match = re.search(lens_pattern, raw_setup, re.IGNORECASE) | |
lens = lens_match.group(1) if lens_match else None | |
# Extract ISO | |
iso_pattern = r'(ISO \d+)' | |
iso_match = re.search(iso_pattern, raw_setup, re.IGNORECASE) | |
iso = iso_match.group(1) if iso_match else None | |
# Build clean setup | |
parts = [] | |
if camera: | |
parts.append(camera) | |
if lens: | |
parts.append(lens) | |
if iso: | |
parts.append(iso) | |
if parts: | |
return f"shot on {', '.join(parts)}" | |
else: | |
return "professional photography" | |
except Exception as e: | |
logger.warning(f"Camera setup formatting failed: {e}") | |
return "professional photography" | |
def _detect_scene_from_content(description: str) -> str: | |
"""Detect scene type from description content""" | |
description_lower = description.lower() | |
# Scene detection patterns | |
if any(term in description_lower for term in ["portrait", "person", "man", "woman", "face"]): | |
return "portrait" | |
elif any(term in description_lower for term in ["landscape", "mountain", "horizon", "nature", "outdoor"]): | |
return "landscape" | |
elif any(term in description_lower for term in ["street", "urban", "city", "building", "crowd"]): | |
return "street" | |
elif any(term in description_lower for term in ["architecture", "building", "structure", "interior"]): | |
return "architecture" | |
else: | |
return "general" | |
def _get_scene_camera_setup(scene_type: str) -> str: | |
"""Get camera setup based on scene type""" | |
setups = { | |
"portrait": "shot on Canon EOS R5, 85mm f/1.4 lens, ISO 200", | |
"landscape": "shot on Phase One XT, 24-70mm f/4 lens, ISO 100", | |
"street": "shot on Leica M11, 35mm f/1.4 lens, ISO 800", | |
"architecture": "shot on Canon EOS R5, 24-70mm f/2.8 lens, ISO 100", | |
"general": "shot on Canon EOS R6, 50mm f/1.8 lens, ISO 400" | |
} | |
return setups.get(scene_type, setups["general"]) | |
def _get_essential_keywords(description: str, camera_setup: str, analysis_metadata: Optional[Dict[str, Any]]) -> List[str]: | |
"""Get essential style keywords without redundancy""" | |
try: | |
keywords = [] | |
description_lower = description.lower() | |
# Only add depth of field if not already mentioned | |
if "depth" not in description_lower and "bokeh" not in description_lower: | |
if any(term in camera_setup for term in ["f/1.4", "f/2.8", "85mm"]): | |
keywords.append("shallow depth of field") | |
# Add professional photography only if no specific camera mentioned | |
if "shot on" not in camera_setup: | |
keywords.append("professional photography") | |
# Scene-specific keywords | |
if "portrait" in description_lower and "studio lighting" not in description_lower: | |
keywords.append("professional portrait") | |
# Technical quality (only if needed) | |
if len(keywords) < 2: | |
keywords.append("high quality") | |
return keywords[:3] # Limit to 3 essential keywords | |
except Exception as e: | |
logger.warning(f"Keyword extraction failed: {e}") | |
return ["professional photography"] | |
def _build_optimized_prompt(description: str, camera_setup: str, keywords: List[str]) -> str: | |
"""Build final optimized prompt with proper structure""" | |
try: | |
# Structure: Description + Technical + Style | |
parts = [] | |
# Core description (clean and concise) | |
if description: | |
parts.append(description) | |
# Technical setup | |
if camera_setup: | |
parts.append(camera_setup) | |
# Essential keywords | |
if keywords: | |
parts.extend(keywords) | |
# Join with consistent separator | |
result = ", ".join(parts) | |
# Final cleanup | |
result = re.sub(r'\s*,\s*,+', ',', result) # Remove double commas | |
result = re.sub(r'\s+', ' ', result) # Clean spaces | |
result = result.strip().rstrip(',') # Remove trailing comma | |
# Ensure it starts with capital letter | |
if result: | |
result = result[0].upper() + result[1:] if len(result) > 1 else result.upper() | |
return result | |
except Exception as e: | |
logger.error(f"Prompt building failed: {e}") | |
return "Professional photograph" | |
def _create_fallback_prompt(original_prompt: str) -> str: | |
"""Create fallback prompt when optimization fails""" | |
try: | |
# Extract first meaningful sentence | |
sentences = re.split(r'[.!?]', original_prompt) | |
if sentences: | |
clean_sentence = sentences[0].strip() | |
# Remove verbose starters | |
clean_sentence = re.sub(r'^(This image shows|The image depicts|This photograph)', '', clean_sentence, flags=re.IGNORECASE) | |
clean_sentence = clean_sentence.strip() | |
if len(clean_sentence) > 20: | |
return f"{clean_sentence}, professional photography" | |
return "Professional photograph with technical excellence" | |
except Exception: | |
return "Professional photograph" | |
def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
""" | |
Calculate enhanced quality score with professional cinematography criteria | |
Args: | |
prompt: The prompt to score | |
analysis_data: Enhanced analysis data with cinematography context | |
Returns: | |
Tuple of (total_score, breakdown_dict) | |
""" | |
if not prompt: | |
return 0, {"prompt_quality": 0, "technical_details": 0, "professional_cinematography": 0, "multi_engine_optimization": 0} | |
breakdown = {} | |
# Enhanced Prompt Quality (0-25 points) | |
length_score = min(15, len(prompt) // 10) # Reward appropriate length | |
detail_score = min(10, len(prompt.split(',')) * 2) # Reward structured detail | |
breakdown["prompt_quality"] = int(length_score + detail_score) | |
# Technical Details with Cinematography Focus (0-25 points) | |
tech_score = 0 | |
# Cinema equipment (higher scores for professional gear) | |
cinema_equipment = ['Canon EOS R', 'Sony A1', 'Leica', 'Hasselblad', 'Phase One', 'ARRI', 'RED'] | |
for equipment in cinema_equipment: | |
if equipment.lower() in prompt.lower(): | |
tech_score += 8 | |
break | |
# Lens specifications | |
if re.search(r'\d+mm.*f/[\d.]+', prompt): | |
tech_score += 6 | |
# ISO settings | |
if re.search(r'ISO \d+', prompt): | |
tech_score += 4 | |
# Professional terminology | |
tech_keywords = ['shot on', 'lens', 'depth of field', 'bokeh'] | |
tech_score += sum(3 for keyword in tech_keywords if keyword in prompt.lower()) | |
breakdown["technical_details"] = min(25, tech_score) | |
# Professional Cinematography (0-25 points) | |
cinema_score = 0 | |
# Professional lighting techniques | |
lighting_terms = ['professional lighting', 'studio lighting', 'natural lighting'] | |
cinema_score += sum(4 for term in lighting_terms if term in prompt.lower()) | |
# Composition techniques | |
composition_terms = ['composition', 'depth of field', 'bokeh', 'shallow depth'] | |
cinema_score += sum(3 for term in composition_terms if term in prompt.lower()) | |
# Professional context bonus | |
if analysis_data and analysis_data.get("has_camera_suggestion"): | |
cinema_score += 6 | |
breakdown["professional_cinematography"] = min(25, cinema_score) | |
# Multi-Engine Optimization (0-25 points) | |
optimization_score = 0 | |
# Check for technical specifications | |
if re.search(r'(?:Canon|Sony|Leica|Phase One)', prompt): | |
optimization_score += 10 | |
# Complete technical specs | |
if re.search(r'\d+mm.*f/[\d.]+.*ISO \d+', prompt): | |
optimization_score += 8 | |
# Professional terminology | |
pro_terms = ['professional', 'shot on', 'high quality'] | |
optimization_score += sum(2 for term in pro_terms if term in prompt.lower()) | |
# Length efficiency bonus (reward conciseness) | |
word_count = len(prompt.split()) | |
if 30 <= word_count <= 60: # Optimal range | |
optimization_score += 5 | |
elif word_count <= 30: | |
optimization_score += 3 | |
breakdown["multi_engine_optimization"] = min(25, optimization_score) | |
# Calculate total | |
total_score = sum(breakdown.values()) | |
return total_score, breakdown | |
def calculate_professional_enhanced_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
""" | |
Enhanced scoring with professional cinematography criteria | |
Args: | |
prompt: The prompt to score | |
analysis_data: Analysis data with cinematography context | |
Returns: | |
Tuple of (total_score, breakdown_dict) | |
""" | |
return calculate_prompt_score(prompt, analysis_data) | |
def get_score_grade(score: int) -> Dict[str, str]: | |
""" | |
Get grade information for a score | |
Args: | |
score: Numeric score | |
Returns: | |
Dictionary with grade and color information | |
""" | |
from config import SCORING_CONFIG | |
for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): | |
if score >= threshold: | |
return grade_info | |
# Default to lowest grade | |
return SCORING_CONFIG["grade_thresholds"][0] | |
def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: | |
""" | |
Format analysis data into a readable report with cinematography insights | |
Args: | |
analysis_data: Analysis results with cinematography context | |
processing_time: Time taken for processing | |
Returns: | |
Formatted markdown report | |
""" | |
model_used = analysis_data.get("model", "Unknown") | |
prompt_length = len(analysis_data.get("prompt", "")) | |
has_cinema_context = analysis_data.get("cinematography_context_applied", False) | |
scene_type = analysis_data.get("scene_type", "general") | |
report = f"""**π¬ PHRAMER AI ANALYSIS COMPLETE** | |
**Model:** {model_used} β’ **Time:** {processing_time:.1f}s β’ **Length:** {prompt_length} chars | |
**π CINEMATOGRAPHY ANALYSIS:** | |
**Scene Type:** {scene_type.replace('_', ' ').title()} | |
**Professional Context:** {'β Applied' if has_cinema_context else 'β Not Applied'} | |
**π― OPTIMIZATIONS APPLIED:** | |
β Clean description extraction | |
β Professional camera configuration | |
β Essential keyword optimization | |
β Token economy optimization | |
β Multi-engine compatibility | |
β Redundancy elimination | |
**β‘ Powered by Pariente AI for MIA TV Series**""" | |
return report | |
def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: | |
""" | |
Safely execute a function with error handling | |
Args: | |
func: Function to execute | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
Tuple of (success: bool, result: Any) | |
""" | |
try: | |
result = func(*args, **kwargs) | |
return True, result | |
except Exception as e: | |
logger.error(f"Safe execution failed for {func.__name__}: {e}") | |
return False, str(e) | |
def truncate_text(text: str, max_length: int = 100) -> str: | |
""" | |
Truncate text to specified length with ellipsis | |
Args: | |
text: Text to truncate | |
max_length: Maximum length | |
Returns: | |
Truncated text | |
""" | |
if not text or len(text) <= max_length: | |
return text | |
return text[:max_length-3] + "..." | |
def enhance_prompt_with_cinematography_knowledge(original_prompt: str, scene_type: str = "default") -> str: | |
""" | |
Enhance prompt with professional cinematography knowledge | |
Args: | |
original_prompt: Base prompt text | |
scene_type: Detected scene type | |
Returns: | |
Enhanced prompt with cinematography context | |
""" | |
try: | |
# Import here to avoid circular imports | |
from professional_photography import enhance_flux_prompt_with_professional_knowledge | |
# Apply professional cinematography enhancement | |
enhanced = enhance_flux_prompt_with_professional_knowledge(original_prompt) | |
logger.info(f"Enhanced prompt with cinematography knowledge for {scene_type} scene") | |
return enhanced | |
except ImportError: | |
logger.warning("Professional photography module not available") | |
return original_prompt | |
except Exception as e: | |
logger.warning(f"Cinematography enhancement failed: {e}") | |
return original_prompt | |
# Export main functions | |
__all__ = [ | |
"setup_logging", | |
"optimize_image", | |
"validate_image", | |
"clean_memory", | |
"apply_flux_rules", | |
"calculate_prompt_score", | |
"calculate_professional_enhanced_score", | |
"get_score_grade", | |
"format_analysis_report", | |
"safe_execute", | |
"truncate_text", | |
"enhance_prompt_with_cinematography_knowledge", | |
"detect_scene_type_from_analysis" | |
] |