Spaces:
Running
on
Zero
Running
on
Zero
""" | |
Utility functions for FLUX Prompt Optimizer | |
Clean, focused, and reusable utilities | |
""" | |
import re | |
import logging | |
import gc | |
from typing import Optional, Tuple, Dict, Any, List | |
from PIL import Image | |
import torch | |
import numpy as np | |
from config import PROCESSING_CONFIG, FLUX_RULES | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def setup_logging(level: str = "INFO") -> None: | |
"""Setup logging configuration""" | |
logging.basicConfig( | |
level=getattr(logging, level.upper()), | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
def optimize_image(image: Any) -> Optional[Image.Image]: | |
""" | |
Optimize image for processing | |
Args: | |
image: Input image (PIL, numpy array, or file path) | |
Returns: | |
Optimized PIL Image or None if failed | |
""" | |
if image is None: | |
return None | |
try: | |
# Convert to PIL Image if necessary | |
if isinstance(image, np.ndarray): | |
image = Image.fromarray(image) | |
elif isinstance(image, str): | |
image = Image.open(image) | |
elif not isinstance(image, Image.Image): | |
logger.error(f"Unsupported image type: {type(image)}") | |
return None | |
# Convert to RGB if necessary | |
if image.mode != 'RGB': | |
image = image.convert('RGB') | |
# Resize if too large | |
max_size = PROCESSING_CONFIG["max_image_size"] | |
if image.size[0] > max_size or image.size[1] > max_size: | |
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS) | |
logger.info(f"Image resized to {image.size}") | |
return image | |
except Exception as e: | |
logger.error(f"Image optimization failed: {e}") | |
return None | |
def validate_image(image: Any) -> bool: | |
""" | |
Validate if image is processable | |
Args: | |
image: Input image to validate | |
Returns: | |
True if valid, False otherwise | |
""" | |
if image is None: | |
return False | |
try: | |
optimized = optimize_image(image) | |
return optimized is not None | |
except Exception: | |
return False | |
def clean_memory() -> None: | |
"""Clean up memory and GPU cache""" | |
try: | |
gc.collect() | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
torch.cuda.synchronize() | |
logger.debug("Memory cleaned") | |
except Exception as e: | |
logger.warning(f"Memory cleanup failed: {e}") | |
def apply_flux_rules(prompt: str) -> str: | |
""" | |
Apply Flux optimization rules to a prompt | |
Args: | |
prompt: Raw prompt text | |
Returns: | |
Optimized prompt following Flux rules | |
""" | |
if not prompt or not isinstance(prompt, str): | |
return "" | |
# Clean the prompt from unwanted elements | |
cleaned_prompt = prompt | |
for pattern in FLUX_RULES["remove_patterns"]: | |
cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE) | |
# Detect image type and add appropriate camera configuration | |
prompt_lower = cleaned_prompt.lower() | |
camera_config = "" | |
if any(word in prompt_lower for word in ['portrait', 'person', 'man', 'woman', 'face']): | |
camera_config = FLUX_RULES["camera_configs"]["portrait"] | |
elif any(word in prompt_lower for word in ['landscape', 'mountain', 'nature', 'outdoor']): | |
camera_config = FLUX_RULES["camera_configs"]["landscape"] | |
elif any(word in prompt_lower for word in ['street', 'urban', 'city']): | |
camera_config = FLUX_RULES["camera_configs"]["street"] | |
else: | |
camera_config = FLUX_RULES["camera_configs"]["default"] | |
# Add lighting enhancements if not present | |
if 'lighting' not in prompt_lower: | |
if 'dramatic' in prompt_lower: | |
cleaned_prompt += FLUX_RULES["lighting_enhancements"]["dramatic"] | |
elif 'portrait' in prompt_lower: | |
cleaned_prompt += FLUX_RULES["lighting_enhancements"]["portrait"] | |
else: | |
cleaned_prompt += FLUX_RULES["lighting_enhancements"]["default"] | |
# Build final prompt | |
final_prompt = cleaned_prompt + camera_config | |
# Clean up formatting | |
final_prompt = _clean_prompt_formatting(final_prompt) | |
return final_prompt | |
def _clean_prompt_formatting(prompt: str) -> str: | |
"""Clean up prompt formatting""" | |
if not prompt: | |
return "" | |
# Ensure it starts with capital letter | |
prompt = prompt.strip() | |
if prompt: | |
prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper() | |
# Clean up spaces and commas | |
prompt = re.sub(r'\s+', ' ', prompt) | |
prompt = re.sub(r',\s*,+', ',', prompt) | |
prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas | |
prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas | |
return prompt.strip() | |
def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]: | |
""" | |
Calculate quality score for a prompt | |
Args: | |
prompt: The prompt to score | |
analysis_data: Optional analysis data to enhance scoring | |
Returns: | |
Tuple of (total_score, breakdown_dict) | |
""" | |
if not prompt: | |
return 0, {"prompt_quality": 0, "technical_details": 0, "artistic_value": 0, "flux_optimization": 0} | |
breakdown = {} | |
# Prompt quality score (0-30 points) | |
length_score = min(20, len(prompt) // 8) # Reward decent length | |
detail_score = min(10, len(prompt.split(',')) * 2) # Reward detail | |
breakdown["prompt_quality"] = length_score + detail_score | |
# Technical details score (0-25 points) | |
tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'camera'] | |
tech_score = sum(5 for keyword in tech_keywords if keyword in prompt.lower()) | |
breakdown["technical_details"] = min(25, tech_score) | |
# Artistic value score (0-25 points) | |
art_keywords = ['masterful', 'professional', 'cinematic', 'dramatic', 'beautiful'] | |
art_score = sum(5 for keyword in art_keywords if keyword in prompt.lower()) | |
breakdown["artistic_value"] = min(25, art_score) | |
# Flux optimization score (0-20 points) | |
flux_score = 0 | |
if any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()): | |
flux_score += 10 | |
if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()): | |
flux_score += 10 | |
breakdown["flux_optimization"] = flux_score | |
# Calculate total | |
total_score = sum(breakdown.values()) | |
return total_score, breakdown | |
def get_score_grade(score: int) -> Dict[str, str]: | |
""" | |
Get grade information for a score | |
Args: | |
score: Numeric score | |
Returns: | |
Dictionary with grade and color information | |
""" | |
from config import SCORING_CONFIG | |
for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True): | |
if score >= threshold: | |
return grade_info | |
# Default to lowest grade | |
return SCORING_CONFIG["grade_thresholds"][0] | |
def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str: | |
""" | |
Format analysis data into a readable report | |
Args: | |
analysis_data: Analysis results | |
processing_time: Time taken for processing | |
Returns: | |
Formatted markdown report | |
""" | |
model_used = analysis_data.get("model_used", "Unknown") | |
prompt_length = len(analysis_data.get("prompt", "")) | |
report = f"""**π FLUX OPTIMIZATION COMPLETE** | |
**Model:** {model_used} β’ **Time:** {processing_time:.1f}s β’ **Length:** {prompt_length} chars | |
**π ANALYSIS SUMMARY:** | |
{analysis_data.get("summary", "Analysis completed successfully")} | |
**π― OPTIMIZATIONS APPLIED:** | |
β Flux camera configuration | |
β Professional lighting setup | |
β Technical photography details | |
β Artistic enhancement keywords | |
**β‘ Powered by Pariente AI Research**""" | |
return report | |
def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]: | |
""" | |
Safely execute a function with error handling | |
Args: | |
func: Function to execute | |
*args: Function arguments | |
**kwargs: Function keyword arguments | |
Returns: | |
Tuple of (success: bool, result: Any) | |
""" | |
try: | |
result = func(*args, **kwargs) | |
return True, result | |
except Exception as e: | |
logger.error(f"Safe execution failed for {func.__name__}: {e}") | |
return False, str(e) | |
def truncate_text(text: str, max_length: int = 100) -> str: | |
""" | |
Truncate text to specified length with ellipsis | |
Args: | |
text: Text to truncate | |
max_length: Maximum length | |
Returns: | |
Truncated text | |
""" | |
if not text or len(text) <= max_length: | |
return text | |
return text[:max_length-3] + "..." | |
# Export main functions | |
__all__ = [ | |
"setup_logging", | |
"optimize_image", | |
"validate_image", | |
"clean_memory", | |
"apply_flux_rules", | |
"calculate_prompt_score", | |
"get_score_grade", | |
"format_analysis_report", | |
"safe_execute", | |
"truncate_text" | |
] |