""" Utility functions for FLUX Prompt Optimizer Clean, focused, and reusable utilities """ import re import logging import gc from typing import Optional, Tuple, Dict, Any, List from PIL import Image import torch import numpy as np from config import PROCESSING_CONFIG, FLUX_RULES # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def setup_logging(level: str = "INFO") -> None: """Setup logging configuration""" logging.basicConfig( level=getattr(logging, level.upper()), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) def optimize_image(image: Any) -> Optional[Image.Image]: """ Optimize image for processing Args: image: Input image (PIL, numpy array, or file path) Returns: Optimized PIL Image or None if failed """ if image is None: return None try: # Convert to PIL Image if necessary if isinstance(image, np.ndarray): image = Image.fromarray(image) elif isinstance(image, str): image = Image.open(image) elif not isinstance(image, Image.Image): logger.error(f"Unsupported image type: {type(image)}") return None # Convert to RGB if necessary if image.mode != 'RGB': image = image.convert('RGB') # Resize if too large max_size = PROCESSING_CONFIG["max_image_size"] if image.size[0] > max_size or image.size[1] > max_size: image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) logger.info(f"Image resized to {image.size}") return image except Exception as e: logger.error(f"Image optimization failed: {e}") return None def validate_image(image: Any) -> bool: """ Validate if image is processable Args: image: Input image to validate Returns: True if valid, False otherwise """ if image is None: return False try: optimized = optimize_image(image) return optimized is not None except Exception: return False def clean_memory() -> None: """Clean up memory and GPU cache""" try: gc.collect() if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.synchronize() logger.debug("Memory cleaned") except Exception as e: logger.warning(f"Memory cleanup failed: {e}") def apply_flux_rules(prompt: str) -> str: """ Apply Flux optimization rules to a prompt Args: prompt: Raw prompt text Returns: Optimized prompt following Flux rules """ if not prompt or not isinstance(prompt, str): return "" # Clean the prompt from unwanted elements cleaned_prompt = prompt for pattern in FLUX_RULES["remove_patterns"]: cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) # Detect image type and add appropriate camera configuration prompt_lower = cleaned_prompt.lower() camera_config = "" if any(word in prompt_lower for word in ['portrait', 'person', 'man', 'woman', 'face']): camera_config = FLUX_RULES["camera_configs"]["portrait"] elif any(word in prompt_lower for word in ['landscape', 'mountain', 'nature', 'outdoor']): camera_config = FLUX_RULES["camera_configs"]["landscape"] elif any(word in prompt_lower for word in ['street', 'urban', 'city']): camera_config = FLUX_RULES["camera_configs"]["street"] else: camera_config = FLUX_RULES["camera_configs"]["default"] # Add lighting enhancements if not present if 'lighting' not in prompt_lower: if 'dramatic' in prompt_lower: cleaned_prompt += FLUX_RULES["lighting_enhancements"]["dramatic"] elif 'portrait' in prompt_lower: cleaned_prompt += FLUX_RULES["lighting_enhancements"]["portrait"] else: cleaned_prompt += FLUX_RULES["lighting_enhancements"]["default"] # Build final prompt final_prompt = cleaned_prompt + camera_config # Clean up formatting final_prompt = _clean_prompt_formatting(final_prompt) return final_prompt def _clean_prompt_formatting(prompt: str) -> str: """Clean up prompt formatting""" if not prompt: return "" # Ensure it starts with capital letter prompt = prompt.strip() if prompt: prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper() # Clean up spaces and commas prompt = re.sub(r'\s+', ' ', prompt) prompt = re.sub(r',\s*,+', ',', prompt) prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas return prompt.strip() def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: """ Calculate quality score for a prompt Args: prompt: The prompt to score analysis_data: Optional analysis data to enhance scoring Returns: Tuple of (total_score, breakdown_dict) """ if not prompt: return 0, {"prompt_quality": 0, "technical_details": 0, "artistic_value": 0, "flux_optimization": 0} breakdown = {} # Prompt quality score (0-30 points) length_score = min(20, len(prompt) // 8) # Reward decent length detail_score = min(10, len(prompt.split(',')) * 2) # Reward detail breakdown["prompt_quality"] = length_score + detail_score # Technical details score (0-25 points) tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'camera'] tech_score = sum(5 for keyword in tech_keywords if keyword in prompt.lower()) breakdown["technical_details"] = min(25, tech_score) # Artistic value score (0-25 points) art_keywords = ['masterful', 'professional', 'cinematic', 'dramatic', 'beautiful'] art_score = sum(5 for keyword in art_keywords if keyword in prompt.lower()) breakdown["artistic_value"] = min(25, art_score) # Flux optimization score (0-20 points) flux_score = 0 if any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()): flux_score += 10 if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()): flux_score += 10 breakdown["flux_optimization"] = flux_score # Calculate total total_score = sum(breakdown.values()) return total_score, breakdown def get_score_grade(score: int) -> Dict[str, str]: """ Get grade information for a score Args: score: Numeric score Returns: Dictionary with grade and color information """ from config import SCORING_CONFIG for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): if score >= threshold: return grade_info # Default to lowest grade return SCORING_CONFIG["grade_thresholds"][0] def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: """ Format analysis data into a readable report Args: analysis_data: Analysis results processing_time: Time taken for processing Returns: Formatted markdown report """ model_used = analysis_data.get("model_used", "Unknown") prompt_length = len(analysis_data.get("prompt", "")) report = f"""**🚀 FLUX OPTIMIZATION COMPLETE** **Model:** {model_used} • **Time:** {processing_time:.1f}s • **Length:** {prompt_length} chars **📊 ANALYSIS SUMMARY:** {analysis_data.get("summary", "Analysis completed successfully")} **🎯 OPTIMIZATIONS APPLIED:** ✅ Flux camera configuration ✅ Professional lighting setup ✅ Technical photography details ✅ Artistic enhancement keywords **⚡ Powered by Pariente AI Research**""" return report def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: """ Safely execute a function with error handling Args: func: Function to execute *args: Function arguments **kwargs: Function keyword arguments Returns: Tuple of (success: bool, result: Any) """ try: result = func(*args, **kwargs) return True, result except Exception as e: logger.error(f"Safe execution failed for {func.__name__}: {e}") return False, str(e) def truncate_text(text: str, max_length: int = 100) -> str: """ Truncate text to specified length with ellipsis Args: text: Text to truncate max_length: Maximum length Returns: Truncated text """ if not text or len(text) <= max_length: return text return text[:max_length-3] + "..." # Export main functions __all__ = [ "setup_logging", "optimize_image", "validate_image", "clean_memory", "apply_flux_rules", "calculate_prompt_score", "get_score_grade", "format_analysis_report", "safe_execute", "truncate_text" ]