""" Utility functions for FLUX Prompt Optimizer Clean, focused, and reusable utilities """ import re import logging import gc from typing import Optional, Tuple, Dict, Any, List from PIL import Image import torch import numpy as np from config import PROCESSING_CONFIG, FLUX_RULES # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def setup_logging(level: str = "INFO") -> None: """Setup logging configuration""" logging.basicConfig( level=getattr(logging, level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def optimize_image(image: Any) -> Optional[Image.Image]: """ Optimize image for processing Args: image: Input image (PIL, numpy array, or file path) Returns: Optimized PIL Image or None if failed """ if image is None: return None try: # Convert to PIL Image if necessary if isinstance(image, np.ndarray): image = Image.fromarray(image) elif isinstance(image, str): image = Image.open(image) elif not isinstance(image, Image.Image): logger.error(f"Unsupported image type: {type(image)}") return None # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Resize if too large max_size = PROCESSING_CONFIG["max_image_size"] if image.size[0] > max_size or image.size[1] > max_size: image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) logger.info(f"Image resized to {image.size}") return image except Exception as e: logger.error(f"Image optimization failed: {e}") return None def validate_image(image: Any) -> bool: """ Validate if image is processable Args: image: Input image to validate Returns: True if valid, False otherwise """ if image is None: return False try: optimized = optimize_image(image) return optimized is not None except Exception: return False def clean_memory() -> None: """Clean up memory and GPU cache""" try: gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() logger.debug("Memory cleaned") except Exception as e: logger.warning(f"Memory cleanup failed: {e}") def apply_flux_rules(prompt: str, analysis_metadata: Optional[Dict[str, Any]] = None) -> str: """ Apply Flux optimization rules to a prompt Args: prompt: Raw prompt text analysis_metadata: Optional metadata from image analysis including camera suggestions Returns: Optimized prompt following Flux rules """ if not prompt or not isinstance(prompt, str): return "" # Clean the prompt from unwanted elements cleaned_prompt = prompt for pattern in FLUX_RULES["remove_patterns"]: cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) # Extract description part only (remove CAMERA_SETUP section if present) description_part = _extract_description_only(cleaned_prompt) # Check if BAGEL provided intelligent camera setup camera_config = "" if analysis_metadata and analysis_metadata.get("has_camera_suggestion") and analysis_metadata.get("camera_setup"): # Use BAGEL's intelligent camera suggestion - clean and format it properly bagel_camera = analysis_metadata["camera_setup"] camera_config = _format_bagel_camera_suggestion(bagel_camera) logger.info(f"Using BAGEL camera suggestion: {camera_config}") else: # Only use fallback if BAGEL didn't suggest anything camera_config = _get_fallback_camera_config(description_part.lower()) logger.info("Using fallback camera configuration") # Add lighting enhancements if not present and not already covered by BAGEL lighting_enhancement = _get_lighting_enhancement(description_part.lower(), camera_config) # Build final prompt: Description + Camera + Lighting final_prompt = description_part + camera_config + lighting_enhancement # Clean up formatting final_prompt = _clean_prompt_formatting(final_prompt) return final_prompt def _extract_description_only(prompt: str) -> str: """Extract only the description part, removing camera setup sections""" # Remove CAMERA_SETUP section if present if "CAMERA_SETUP:" in prompt: parts = prompt.split("CAMERA_SETUP:") description = parts[0].strip() elif "2. CAMERA_SETUP" in prompt: parts = prompt.split("2. CAMERA_SETUP") description = parts[0].strip() else: description = prompt # Remove "DESCRIPTION:" label if present if description.startswith("DESCRIPTION:"): description = description.replace("DESCRIPTION:", "").strip() elif description.startswith("1. DESCRIPTION:"): description = description.replace("1. DESCRIPTION:", "").strip() # Clean up any remaining camera recommendations from the description description = re.sub(r'For this type of scene.*?shooting style would be.*?\.', '', description, flags=re.DOTALL) description = re.sub(r'I would recommend.*?aperture.*?\.', '', description, flags=re.DOTALL) # Remove numbered section residues (like "2.," at the end) description = re.sub(r'\s*\d+\.\s*,?\s*$', '', description) description = re.sub(r'\s*\d+\.\s*,?\s*', ' ', description) return description.strip() def _format_bagel_camera_suggestion(bagel_camera: str) -> str: """Format BAGEL's camera suggestion into clean FLUX format""" try: # Clean up the BAGEL suggestion camera_text = bagel_camera.strip() # Remove "CAMERA_SETUP:" if it's still there camera_text = re.sub(r'^CAMERA_SETUP:\s*', '', camera_text) # Extract key camera info using regex patterns camera_patterns = { 'camera': r'(Canon EOS [^,]+|Sony A[^,]+|Leica [^,]+|Hasselblad [^,]+|Phase One [^,]+|Fujifilm [^,]+)', 'lens': r'(\d+mm[^,]*|[^,]*lens[^,]*)', 'aperture': r'(f/[\d.]+[^,]*)' } extracted_parts = [] for key, pattern in camera_patterns.items(): match = re.search(pattern, camera_text, re.IGNORECASE) if match: extracted_parts.append(match.group(1).strip()) if extracted_parts: # Build clean camera config camera_info = ', '.join(extracted_parts) return f", Shot on {camera_info}, professional photography" else: # Fallback: use the first part of BAGEL's suggestion first_sentence = camera_text.split('.')[0].strip() if len(first_sentence) > 10: return f", {first_sentence}, professional photography" else: return ", professional camera setup" except Exception as e: logger.warning(f"Failed to format BAGEL camera suggestion: {e}") return ", professional camera setup" def _get_fallback_camera_config(prompt_lower: str) -> str: """Get fallback camera configuration when BAGEL doesn't suggest one""" # Improved detection logic if any(word in prompt_lower for word in ['street', 'urban', 'city', 'documentary', 'crowd', 'protest']): return FLUX_RULES["camera_configs"]["street"] elif any(word in prompt_lower for word in ['portrait', 'person', 'man', 'woman', 'face']) and not any(word in prompt_lower for word in ['street', 'crowd', 'scene']): return FLUX_RULES["camera_configs"]["portrait"] elif any(word in prompt_lower for word in ['landscape', 'mountain', 'nature', 'outdoor']): return FLUX_RULES["camera_configs"]["landscape"] else: return FLUX_RULES["camera_configs"]["default"] def _get_lighting_enhancement(prompt_lower: str, camera_config: str) -> str: """Determine appropriate lighting enhancement""" # Don't add lighting if already mentioned in prompt or camera config if 'lighting' in prompt_lower or 'lighting' in camera_config.lower(): return "" if 'dramatic' in prompt_lower or 'chaos' in prompt_lower or 'fire' in prompt_lower: return FLUX_RULES["lighting_enhancements"]["dramatic"] elif 'portrait' in camera_config.lower(): return FLUX_RULES["lighting_enhancements"]["portrait"] else: return FLUX_RULES["lighting_enhancements"]["default"] def _clean_prompt_formatting(prompt: str) -> str: """Clean up prompt formatting""" if not prompt: return "" # Ensure it starts with capital letter prompt = prompt.strip() if prompt: prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper() # Clean up spaces and commas prompt = re.sub(r'\s+', ' ', prompt) prompt = re.sub(r',\s*,+', ',', prompt) prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas # Remove redundant periods prompt = re.sub(r'\.+', '.', prompt) return prompt.strip() def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: """ Calculate quality score for a prompt Args: prompt: The prompt to score analysis_data: Optional analysis data to enhance scoring Returns: Tuple of (total_score, breakdown_dict) """ if not prompt: return 0, {"prompt_quality": 0, "technical_details": 0, "artistic_value": 0, "flux_optimization": 0} breakdown = {} # Prompt quality score (0-30 points) length_score = min(20, len(prompt) // 8) # Reward decent length detail_score = min(10, len(prompt.split(',')) * 2) # Reward detail breakdown["prompt_quality"] = length_score + detail_score # Technical details score (0-25 points) - Enhanced for BAGEL camera suggestions tech_score = 0 tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'camera'] for keyword in tech_keywords: if keyword in prompt.lower(): tech_score += 5 # Bonus points for BAGEL camera suggestions if analysis_data and analysis_data.get("has_camera_suggestion"): tech_score += 10 # Higher bonus for intelligent camera selection breakdown["technical_details"] = min(25, tech_score) # Artistic value score (0-25 points) art_keywords = ['masterful', 'professional', 'cinematic', 'dramatic', 'beautiful'] art_score = sum(5 for keyword in art_keywords if keyword in prompt.lower()) breakdown["artistic_value"] = min(25, art_score) # Flux optimization score (0-20 points) flux_score = 0 # Check for camera configuration (prefer BAGEL over fallback) if analysis_data and analysis_data.get("has_camera_suggestion"): flux_score += 15 # Higher score for BAGEL suggestions elif any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()): flux_score += 10 # Lower score for fallback # Check for lighting configuration if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()): flux_score += 5 breakdown["flux_optimization"] = flux_score # Calculate total total_score = sum(breakdown.values()) return total_score, breakdown def get_score_grade(score: int) -> Dict[str, str]: """ Get grade information for a score Args: score: Numeric score Returns: Dictionary with grade and color information """ from config import SCORING_CONFIG for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): if score >= threshold: return grade_info # Default to lowest grade return SCORING_CONFIG["grade_thresholds"][0] def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: """ Format analysis data into a readable report Args: analysis_data: Analysis results processing_time: Time taken for processing Returns: Formatted markdown report """ model_used = analysis_data.get("model_used", "Unknown") prompt_length = len(analysis_data.get("prompt", "")) report = f"""**🚀 FLUX OPTIMIZATION COMPLETE** **Model:** {model_used} • **Time:** {processing_time:.1f}s • **Length:** {prompt_length} chars **📊 ANALYSIS SUMMARY:** {analysis_data.get("summary", "Analysis completed successfully")} **🎯 OPTIMIZATIONS APPLIED:** ✅ Flux camera configuration ✅ Professional lighting setup ✅ Technical photography details ✅ Artistic enhancement keywords **⚡ Powered by Frame 0 Laboratory for MIA**""" return report def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: """ Safely execute a function with error handling Args: func: Function to execute *args: Function arguments **kwargs: Function keyword arguments Returns: Tuple of (success: bool, result: Any) """ try: result = func(*args, **kwargs) return True, result except Exception as e: logger.error(f"Safe execution failed for {func.__name__}: {e}") return False, str(e) def truncate_text(text: str, max_length: int = 100) -> str: """ Truncate text to specified length with ellipsis Args: text: Text to truncate max_length: Maximum length Returns: Truncated text """ if not text or len(text) <= max_length: return text return text[:max_length-3] + "..." # Export main functions __all__ = [ "setup_logging", "optimize_image", "validate_image", "clean_memory", "apply_flux_rules", "calculate_prompt_score", "get_score_grade", "format_analysis_report", "safe_execute", "truncate_text" ]