Spaces:
Running
on
Zero
Running
on
Zero
| """ | |
| Utility functions for Phramer AI | |
| By Pariente AI, for MIA TV Series | |
| Enhanced with professional cinematography knowledge and intelligent token economy | |
| """ | |
| import re | |
| import logging | |
| import gc | |
| from typing import Optional, Tuple, Dict, Any, List | |
| from PIL import Image | |
| import torch | |
| import numpy as np | |
| from config import PROCESSING_CONFIG, FLUX_RULES, PROFESSIONAL_PHOTOGRAPHY_CONFIG | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| def setup_logging(level: str = "INFO") -> None: | |
| """Setup logging configuration""" | |
| logging.basicConfig( | |
| level=getattr(logging, level.upper()), | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| def optimize_image(image: Any) -> Optional[Image.Image]: | |
| """ | |
| Optimize image for processing | |
| Args: | |
| image: Input image (PIL, numpy array, or file path) | |
| Returns: | |
| Optimized PIL Image or None if failed | |
| """ | |
| if image is None: | |
| return None | |
| try: | |
| # Convert to PIL Image if necessary | |
| if isinstance(image, np.ndarray): | |
| image = Image.fromarray(image) | |
| elif isinstance(image, str): | |
| image = Image.open(image) | |
| elif not isinstance(image, Image.Image): | |
| logger.error(f"Unsupported image type: {type(image)}") | |
| return None | |
| # Convert to RGB if necessary | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| # Resize if too large | |
| max_size = PROCESSING_CONFIG["max_image_size"] | |
| if image.size[0] > max_size or image.size[1] > max_size: | |
| image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
| logger.info(f"Image resized to {image.size}") | |
| return image | |
| except Exception as e: | |
| logger.error(f"Image optimization failed: {e}") | |
| return None | |
| def validate_image(image: Any) -> bool: | |
| """ | |
| Validate if image is processable | |
| Args: | |
| image: Input image to validate | |
| Returns: | |
| True if valid, False otherwise | |
| """ | |
| if image is None: | |
| return False | |
| try: | |
| optimized = optimize_image(image) | |
| return optimized is not None | |
| except Exception: | |
| return False | |
| def clean_memory() -> None: | |
| """Clean up memory and GPU cache""" | |
| try: | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() | |
| logger.debug("Memory cleaned") | |
| except Exception as e: | |
| logger.warning(f"Memory cleanup failed: {e}") | |
| def detect_scene_type_from_analysis(analysis_metadata: Dict[str, Any]) -> str: | |
| """Detect scene type from BAGEL analysis metadata""" | |
| try: | |
| # Check if BAGEL provided scene detection | |
| if "scene_type" in analysis_metadata: | |
| return analysis_metadata["scene_type"] | |
| # Check camera setup for scene hints | |
| camera_setup = analysis_metadata.get("camera_setup", "").lower() | |
| if any(term in camera_setup for term in ["portrait", "85mm", "135mm"]): | |
| return "portrait" | |
| elif any(term in camera_setup for term in ["landscape", "wide", "24mm", "phase one"]): | |
| return "landscape" | |
| elif any(term in camera_setup for term in ["street", "35mm", "documentary", "leica"]): | |
| return "street" | |
| elif any(term in camera_setup for term in ["cinema", "arri", "red", "anamorphic"]): | |
| return "cinematic" | |
| elif any(term in camera_setup for term in ["architecture", "building", "tilt"]): | |
| return "architectural" | |
| return "default" | |
| except Exception as e: | |
| logger.warning(f"Scene type detection failed: {e}") | |
| return "default" | |
| def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str: | |
| """ | |
| Apply enhanced prompt optimization - FORMAT ONLY, do not filter content | |
| Let professional_photography.py do ALL the cinematographic work | |
| Args: | |
| prompt: Raw prompt text from BAGEL analysis (already enriched by professional_photography.py) | |
| analysis_metadata: Enhanced metadata with cinematography suggestions | |
| Returns: | |
| Clean formatted prompt preserving ALL professional cinematography content | |
| """ | |
| if not prompt or not isinstance(prompt, str): | |
| return "" | |
| try: | |
| # Step 1: Extract the rich professional description (preserve ALL content) | |
| description = _extract_professional_description(prompt) | |
| # Step 2: Extract camera setup if provided by BAGEL | |
| camera_setup = _extract_camera_setup(prompt, analysis_metadata) | |
| # Step 3: Format into clean structure (NO filtering) | |
| formatted_prompt = _format_professional_prompt(description, camera_setup) | |
| logger.info(f"Professional prompt formatted: {len(prompt)} β {len(formatted_prompt)} chars") | |
| return formatted_prompt | |
| except Exception as e: | |
| logger.error(f"Professional prompt formatting failed: {e}") | |
| return prompt # Return original if formatting fails | |
| def _extract_professional_description(prompt: str) -> str: | |
| """ | |
| Extract the professional description - preserve ALL cinematographic content | |
| Only clean formatting, DO NOT filter content | |
| """ | |
| try: | |
| # Split sections if present | |
| if "CAMERA_SETUP:" in prompt: | |
| description = prompt.split("CAMERA_SETUP:")[0].strip() | |
| elif "2. CAMERA_SETUP" in prompt: | |
| description = prompt.split("2. CAMERA_SETUP")[0].strip() | |
| else: | |
| description = prompt | |
| # Remove only section headers, preserve ALL content | |
| description = re.sub(r'^(DESCRIPTION:|1\.\s*DESCRIPTION:)\s*', '', description, flags=re.IGNORECASE) | |
| # Clean up only formatting issues, preserve ALL professional terminology | |
| # Remove only redundant whitespace | |
| description = re.sub(r'\s+', ' ', description) | |
| description = description.strip() | |
| # If description is too long, preserve the most important parts | |
| # But DO NOT remove cinematographic terms or professional language | |
| if len(description) > 300: | |
| # Only truncate at sentence boundaries to preserve meaning | |
| sentences = re.split(r'[.!?]+', description) | |
| truncated = "" | |
| for sentence in sentences: | |
| if len(truncated + sentence) < 280: | |
| truncated += sentence + ". " | |
| else: | |
| break | |
| if truncated: | |
| description = truncated.strip() | |
| return description | |
| except Exception as e: | |
| logger.warning(f"Professional description extraction failed: {e}") | |
| return prompt | |
| def _extract_camera_setup(prompt: str, analysis_metadata: Optional[Dict[str, Any]]) -> str: | |
| """ | |
| Extract camera setup from BAGEL output or metadata | |
| """ | |
| try: | |
| # First check if BAGEL provided camera setup in the prompt | |
| camera_setup = "" | |
| if "CAMERA_SETUP:" in prompt: | |
| camera_section = prompt.split("CAMERA_SETUP:")[1].strip() | |
| # Take first substantial line | |
| lines = camera_section.split('\n') | |
| for line in lines: | |
| if len(line.strip()) > 20: | |
| camera_setup = line.strip() | |
| break | |
| elif "2. CAMERA_SETUP" in prompt: | |
| camera_section = prompt.split("2. CAMERA_SETUP")[1].strip() | |
| lines = camera_section.split('\n') | |
| for line in lines: | |
| if len(line.strip()) > 20: | |
| camera_setup = line.strip() | |
| break | |
| # If no setup in prompt, check metadata | |
| if not camera_setup and analysis_metadata: | |
| camera_setup = analysis_metadata.get("camera_setup", "") | |
| # Format camera setup if found | |
| if camera_setup: | |
| return _format_camera_setup(camera_setup) | |
| # Return empty if no camera setup (let the description speak for itself) | |
| return "" | |
| except Exception as e: | |
| logger.warning(f"Camera setup extraction failed: {e}") | |
| return "" | |
| def _format_camera_setup(raw_setup: str) -> str: | |
| """ | |
| Format camera setup preserving ALL technical information | |
| """ | |
| try: | |
| # Clean up common prefixes but preserve all technical specs | |
| setup = re.sub(r'^(Based on.*?recommend|I would recommend|For this.*?setup)\s*', '', raw_setup, flags=re.IGNORECASE) | |
| setup = re.sub(r'^(CAMERA_SETUP:|2\.\s*CAMERA_SETUP:?)\s*', '', setup, flags=re.IGNORECASE) | |
| # Ensure proper formatting | |
| if setup and not setup.lower().startswith('shot on'): | |
| setup = f"shot on {setup}" | |
| return setup.strip() | |
| except Exception as e: | |
| logger.warning(f"Camera setup formatting failed: {e}") | |
| return raw_setup | |
| def _format_professional_prompt(description: str, camera_setup: str) -> str: | |
| """ | |
| Format the final prompt preserving ALL professional cinematography content | |
| Structure: [Professional Description] + [Camera Setup] | |
| """ | |
| try: | |
| parts = [] | |
| # Add the rich professional description (preserve ALL content) | |
| if description: | |
| parts.append(description) | |
| # Add camera setup if available | |
| if camera_setup: | |
| parts.append(camera_setup) | |
| # Join with clean formatting | |
| result = ", ".join(parts) | |
| # Clean up only formatting issues | |
| result = re.sub(r'\s*,\s*,+', ',', result) # Remove double commas | |
| result = re.sub(r'\s+', ' ', result) # Clean multiple spaces | |
| result = result.strip().rstrip(',') # Clean edges | |
| # Ensure proper capitalization | |
| if result: | |
| result = result[0].upper() + result[1:] if len(result) > 1 else result.upper() | |
| return result | |
| except Exception as e: | |
| logger.error(f"Professional prompt formatting failed: {e}") | |
| return description if description else "Professional cinematographic photograph" | |
| def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
| """ | |
| Calculate enhanced quality score with professional cinematography criteria | |
| Args: | |
| prompt: The prompt to score | |
| analysis_data: Enhanced analysis data with cinematography context | |
| Returns: | |
| Tuple of (total_score, breakdown_dict) | |
| """ | |
| if not prompt: | |
| return 0, {"prompt_quality": 0, "technical_details": 0, "professional_cinematography": 0, "multi_engine_optimization": 0} | |
| breakdown = {} | |
| # Enhanced Prompt Quality (0-25 points) | |
| length_score = min(15, len(prompt) // 15) # Reward appropriate length | |
| detail_score = min(10, len(prompt.split(',')) * 1.5) # Reward structured detail | |
| breakdown["prompt_quality"] = int(length_score + detail_score) | |
| # Technical Details with Cinematography Focus (0-25 points) | |
| tech_score = 0 | |
| # Cinema equipment (higher scores for professional gear) | |
| cinema_equipment = ['Canon EOS R', 'Sony A1', 'Leica', 'Hasselblad', 'Phase One', 'ARRI', 'RED'] | |
| for equipment in cinema_equipment: | |
| if equipment.lower() in prompt.lower(): | |
| tech_score += 8 | |
| break | |
| # Lens specifications | |
| if re.search(r'\d+mm.*f/[\d.]+', prompt): | |
| tech_score += 6 | |
| # ISO settings | |
| if re.search(r'ISO \d+', prompt): | |
| tech_score += 4 | |
| # Professional terminology from professional_photography.py | |
| tech_keywords = ['shot on', 'lens', 'depth of field', 'bokeh', 'composition', 'lighting'] | |
| tech_score += sum(2 for keyword in tech_keywords if keyword in prompt.lower()) | |
| breakdown["technical_details"] = min(25, tech_score) | |
| # Professional Cinematography (0-25 points) - Check for professional_photography.py terms | |
| cinema_score = 0 | |
| # Photographic planes | |
| planes = ['wide shot', 'close-up', 'medium shot', 'extreme wide', 'extreme close-up', 'detail shot'] | |
| cinema_score += sum(4 for plane in planes if plane in prompt.lower()) | |
| # Camera angles | |
| angles = ['low angle', 'high angle', 'eye level', 'dutch angle', 'elevated perspective'] | |
| cinema_score += sum(4 for angle in angles if angle in prompt.lower()) | |
| # Lighting principles | |
| lighting = ['golden hour', 'blue hour', 'natural lighting', 'studio lighting', 'dramatic lighting', 'soft lighting'] | |
| cinema_score += sum(3 for light in lighting if light in prompt.lower()) | |
| # Composition rules | |
| composition = ['rule of thirds', 'leading lines', 'symmetrical', 'centered', 'dynamic composition'] | |
| cinema_score += sum(3 for comp in composition if comp in prompt.lower()) | |
| # Professional context bonus | |
| if analysis_data and analysis_data.get("has_camera_suggestion"): | |
| cinema_score += 6 | |
| breakdown["professional_cinematography"] = min(25, cinema_score) | |
| # Multi-Engine Optimization (0-25 points) | |
| optimization_score = 0 | |
| # Check for complete technical specifications | |
| if re.search(r'(?:Canon|Sony|Leica|Phase One|ARRI|RED)', prompt): | |
| optimization_score += 10 | |
| # Complete technical specs | |
| if re.search(r'shot on.*\d+mm.*f/[\d.]+', prompt): | |
| optimization_score += 8 | |
| # Professional terminology density | |
| pro_terms = ['professional', 'cinematographic', 'shot on', 'composition', 'lighting'] | |
| optimization_score += sum(1 for term in pro_terms if term in prompt.lower()) | |
| # Length efficiency (reward comprehensive but concise) | |
| word_count = len(prompt.split()) | |
| if 40 <= word_count <= 80: # Optimal range for rich but efficient prompts | |
| optimization_score += 5 | |
| elif 20 <= word_count <= 40: | |
| optimization_score += 3 | |
| breakdown["multi_engine_optimization"] = min(25, optimization_score) | |
| # Calculate total | |
| total_score = sum(breakdown.values()) | |
| return total_score, breakdown | |
| def calculate_professional_enhanced_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
| """ | |
| Enhanced scoring with professional cinematography criteria | |
| Args: | |
| prompt: The prompt to score | |
| analysis_data: Analysis data with cinematography context | |
| Returns: | |
| Tuple of (total_score, breakdown_dict) | |
| """ | |
| return calculate_prompt_score(prompt, analysis_data) | |
| def get_score_grade(score: int) -> Dict[str, str]: | |
| """ | |
| Get grade information for a score | |
| Args: | |
| score: Numeric score | |
| Returns: | |
| Dictionary with grade and color information | |
| """ | |
| from config import SCORING_CONFIG | |
| for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): | |
| if score >= threshold: | |
| return grade_info | |
| # Default to lowest grade | |
| return SCORING_CONFIG["grade_thresholds"][0] | |
| def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: | |
| """ | |
| Format analysis data into a readable report with cinematography insights | |
| Args: | |
| analysis_data: Analysis results with cinematography context | |
| processing_time: Time taken for processing | |
| Returns: | |
| Formatted markdown report | |
| """ | |
| model_used = analysis_data.get("model", "Unknown") | |
| prompt_length = len(analysis_data.get("prompt", "")) | |
| has_cinema_context = analysis_data.get("cinematography_context_applied", False) | |
| scene_type = analysis_data.get("scene_type", "general") | |
| report = f"""**π¬ PHRAMER AI ANALYSIS COMPLETE** | |
| **Model:** {model_used} β’ **Time:** {processing_time:.1f}s β’ **Length:** {prompt_length} chars | |
| **π CINEMATOGRAPHY ANALYSIS:** | |
| **Scene Type:** {scene_type.replace('_', ' ').title()} | |
| **Professional Context:** {'β Applied' if has_cinema_context else 'β Not Applied'} | |
| **π― OPTIMIZATIONS APPLIED:** | |
| β Complete professional cinematography analysis | |
| β Preserved all technical and artistic content | |
| β Structured professional prompt format | |
| β Multi-engine compatibility maintained | |
| β Professional photography knowledge integrated | |
| β Cinematographic terminology preserved | |
| **β‘ Powered by Pariente AI for MIA TV Series**""" | |
| return report | |
| def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: | |
| """ | |
| Safely execute a function with error handling | |
| Args: | |
| func: Function to execute | |
| *args: Function arguments | |
| **kwargs: Function keyword arguments | |
| Returns: | |
| Tuple of (success: bool, result: Any) | |
| """ | |
| try: | |
| result = func(*args, **kwargs) | |
| return True, result | |
| except Exception as e: | |
| logger.error(f"Safe execution failed for {func.__name__}: {e}") | |
| return False, str(e) | |
| def truncate_text(text: str, max_length: int = 100) -> str: | |
| """ | |
| Truncate text to specified length with ellipsis | |
| Args: | |
| text: Text to truncate | |
| max_length: Maximum length | |
| Returns: | |
| Truncated text | |
| """ | |
| if not text or len(text) <= max_length: | |
| return text | |
| return text[:max_length-3] + "..." | |
| def enhance_prompt_with_cinematography_knowledge(original_prompt: str, scene_type: str = "default") -> str: | |
| """ | |
| Enhance prompt with professional cinematography knowledge | |
| Args: | |
| original_prompt: Base prompt text | |
| scene_type: Detected scene type | |
| Returns: | |
| Enhanced prompt with cinematography context | |
| """ | |
| try: | |
| # Import here to avoid circular imports | |
| from professional_photography import enhance_flux_prompt_with_professional_knowledge | |
| # Apply professional cinematography enhancement | |
| enhanced = enhance_flux_prompt_with_professional_knowledge(original_prompt) | |
| logger.info(f"Enhanced prompt with cinematography knowledge for {scene_type} scene") | |
| return enhanced | |
| except ImportError: | |
| logger.warning("Professional photography module not available") | |
| return original_prompt | |
| except Exception as e: | |
| logger.warning(f"Cinematography enhancement failed: {e}") | |
| return original_prompt | |
| # Export main functions | |
| __all__ = [ | |
| "setup_logging", | |
| "optimize_image", | |
| "validate_image", | |
| "clean_memory", | |
| "apply_flux_rules", | |
| "calculate_prompt_score", | |
| "calculate_professional_enhanced_score", | |
| "get_score_grade", | |
| "format_analysis_report", | |
| "safe_execute", | |
| "truncate_text", | |
| "enhance_prompt_with_cinematography_knowledge", | |
| "detect_scene_type_from_analysis" | |
| ] |