Spaces:
Running
on
Zero
Running
on
Zero
Create utils.py
Browse files
utils.py
ADDED
@@ -0,0 +1,314 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""
|
2 |
+
Utility functions for FLUX Prompt Optimizer
|
3 |
+
Clean, focused, and reusable utilities
|
4 |
+
"""
|
5 |
+
|
6 |
+
import re
|
7 |
+
import logging
|
8 |
+
import gc
|
9 |
+
from typing import Optional, Tuple, Dict, Any, List
|
10 |
+
from PIL import Image
|
11 |
+
import torch
|
12 |
+
import numpy as np
|
13 |
+
|
14 |
+
from config import PROCESSING_CONFIG, FLUX_RULES
|
15 |
+
|
16 |
+
# Configure logging
|
17 |
+
logging.basicConfig(level=logging.INFO)
|
18 |
+
logger = logging.getLogger(__name__)
|
19 |
+
|
20 |
+
|
21 |
+
def setup_logging(level: str = "INFO") -> None:
|
22 |
+
"""Setup logging configuration"""
|
23 |
+
logging.basicConfig(
|
24 |
+
level=getattr(logging, level.upper()),
|
25 |
+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
26 |
+
)
|
27 |
+
|
28 |
+
|
29 |
+
def optimize_image(image: Any) -> Optional[Image.Image]:
|
30 |
+
"""
|
31 |
+
Optimize image for processing
|
32 |
+
|
33 |
+
Args:
|
34 |
+
image: Input image (PIL, numpy array, or file path)
|
35 |
+
|
36 |
+
Returns:
|
37 |
+
Optimized PIL Image or None if failed
|
38 |
+
"""
|
39 |
+
if image is None:
|
40 |
+
return None
|
41 |
+
|
42 |
+
try:
|
43 |
+
# Convert to PIL Image if necessary
|
44 |
+
if isinstance(image, np.ndarray):
|
45 |
+
image = Image.fromarray(image)
|
46 |
+
elif isinstance(image, str):
|
47 |
+
image = Image.open(image)
|
48 |
+
elif not isinstance(image, Image.Image):
|
49 |
+
logger.error(f"Unsupported image type: {type(image)}")
|
50 |
+
return None
|
51 |
+
|
52 |
+
# Convert to RGB if necessary
|
53 |
+
if image.mode != 'RGB':
|
54 |
+
image = image.convert('RGB')
|
55 |
+
|
56 |
+
# Resize if too large
|
57 |
+
max_size = PROCESSING_CONFIG["max_image_size"]
|
58 |
+
if image.size[0] > max_size or image.size[1] > max_size:
|
59 |
+
image.thumbnail((max_size, max_size), Image.Resampling.LANCZOS)
|
60 |
+
logger.info(f"Image resized to {image.size}")
|
61 |
+
|
62 |
+
return image
|
63 |
+
|
64 |
+
except Exception as e:
|
65 |
+
logger.error(f"Image optimization failed: {e}")
|
66 |
+
return None
|
67 |
+
|
68 |
+
|
69 |
+
def validate_image(image: Any) -> bool:
|
70 |
+
"""
|
71 |
+
Validate if image is processable
|
72 |
+
|
73 |
+
Args:
|
74 |
+
image: Input image to validate
|
75 |
+
|
76 |
+
Returns:
|
77 |
+
True if valid, False otherwise
|
78 |
+
"""
|
79 |
+
if image is None:
|
80 |
+
return False
|
81 |
+
|
82 |
+
try:
|
83 |
+
optimized = optimize_image(image)
|
84 |
+
return optimized is not None
|
85 |
+
except Exception:
|
86 |
+
return False
|
87 |
+
|
88 |
+
|
89 |
+
def clean_memory() -> None:
|
90 |
+
"""Clean up memory and GPU cache"""
|
91 |
+
try:
|
92 |
+
gc.collect()
|
93 |
+
if torch.cuda.is_available():
|
94 |
+
torch.cuda.empty_cache()
|
95 |
+
torch.cuda.synchronize()
|
96 |
+
logger.debug("Memory cleaned")
|
97 |
+
except Exception as e:
|
98 |
+
logger.warning(f"Memory cleanup failed: {e}")
|
99 |
+
|
100 |
+
|
101 |
+
def apply_flux_rules(prompt: str) -> str:
|
102 |
+
"""
|
103 |
+
Apply Flux optimization rules to a prompt
|
104 |
+
|
105 |
+
Args:
|
106 |
+
prompt: Raw prompt text
|
107 |
+
|
108 |
+
Returns:
|
109 |
+
Optimized prompt following Flux rules
|
110 |
+
"""
|
111 |
+
if not prompt or not isinstance(prompt, str):
|
112 |
+
return ""
|
113 |
+
|
114 |
+
# Clean the prompt from unwanted elements
|
115 |
+
cleaned_prompt = prompt
|
116 |
+
for pattern in FLUX_RULES["remove_patterns"]:
|
117 |
+
cleaned_prompt = re.sub(pattern, '', cleaned_prompt, flags=re.IGNORECASE)
|
118 |
+
|
119 |
+
# Detect image type and add appropriate camera configuration
|
120 |
+
prompt_lower = cleaned_prompt.lower()
|
121 |
+
camera_config = ""
|
122 |
+
|
123 |
+
if any(word in prompt_lower for word in ['portrait', 'person', 'man', 'woman', 'face']):
|
124 |
+
camera_config = FLUX_RULES["camera_configs"]["portrait"]
|
125 |
+
elif any(word in prompt_lower for word in ['landscape', 'mountain', 'nature', 'outdoor']):
|
126 |
+
camera_config = FLUX_RULES["camera_configs"]["landscape"]
|
127 |
+
elif any(word in prompt_lower for word in ['street', 'urban', 'city']):
|
128 |
+
camera_config = FLUX_RULES["camera_configs"]["street"]
|
129 |
+
else:
|
130 |
+
camera_config = FLUX_RULES["camera_configs"]["default"]
|
131 |
+
|
132 |
+
# Add lighting enhancements if not present
|
133 |
+
if 'lighting' not in prompt_lower:
|
134 |
+
if 'dramatic' in prompt_lower:
|
135 |
+
cleaned_prompt += FLUX_RULES["lighting_enhancements"]["dramatic"]
|
136 |
+
elif 'portrait' in prompt_lower:
|
137 |
+
cleaned_prompt += FLUX_RULES["lighting_enhancements"]["portrait"]
|
138 |
+
else:
|
139 |
+
cleaned_prompt += FLUX_RULES["lighting_enhancements"]["default"]
|
140 |
+
|
141 |
+
# Build final prompt
|
142 |
+
final_prompt = cleaned_prompt + camera_config
|
143 |
+
|
144 |
+
# Clean up formatting
|
145 |
+
final_prompt = _clean_prompt_formatting(final_prompt)
|
146 |
+
|
147 |
+
return final_prompt
|
148 |
+
|
149 |
+
|
150 |
+
def _clean_prompt_formatting(prompt: str) -> str:
|
151 |
+
"""Clean up prompt formatting"""
|
152 |
+
if not prompt:
|
153 |
+
return ""
|
154 |
+
|
155 |
+
# Ensure it starts with capital letter
|
156 |
+
prompt = prompt.strip()
|
157 |
+
if prompt:
|
158 |
+
prompt = prompt[0].upper() + prompt[1:] if len(prompt) > 1 else prompt.upper()
|
159 |
+
|
160 |
+
# Clean up spaces and commas
|
161 |
+
prompt = re.sub(r'\s+', ' ', prompt)
|
162 |
+
prompt = re.sub(r',\s*,+', ',', prompt)
|
163 |
+
prompt = re.sub(r'^\s*,\s*', '', prompt) # Remove leading commas
|
164 |
+
prompt = re.sub(r'\s*,\s*$', '', prompt) # Remove trailing commas
|
165 |
+
|
166 |
+
return prompt.strip()
|
167 |
+
|
168 |
+
|
169 |
+
def calculate_prompt_score(prompt: str, analysis_data: Optional[Dict[str, Any]] = None) -> Tuple[int, Dict[str, int]]:
|
170 |
+
"""
|
171 |
+
Calculate quality score for a prompt
|
172 |
+
|
173 |
+
Args:
|
174 |
+
prompt: The prompt to score
|
175 |
+
analysis_data: Optional analysis data to enhance scoring
|
176 |
+
|
177 |
+
Returns:
|
178 |
+
Tuple of (total_score, breakdown_dict)
|
179 |
+
"""
|
180 |
+
if not prompt:
|
181 |
+
return 0, {"prompt_quality": 0, "technical_details": 0, "artistic_value": 0, "flux_optimization": 0}
|
182 |
+
|
183 |
+
breakdown = {}
|
184 |
+
|
185 |
+
# Prompt quality score (0-30 points)
|
186 |
+
length_score = min(20, len(prompt) // 8) # Reward decent length
|
187 |
+
detail_score = min(10, len(prompt.split(',')) * 2) # Reward detail
|
188 |
+
breakdown["prompt_quality"] = length_score + detail_score
|
189 |
+
|
190 |
+
# Technical details score (0-25 points)
|
191 |
+
tech_keywords = ['shot on', 'lens', 'photography', 'lighting', 'camera']
|
192 |
+
tech_score = sum(5 for keyword in tech_keywords if keyword in prompt.lower())
|
193 |
+
breakdown["technical_details"] = min(25, tech_score)
|
194 |
+
|
195 |
+
# Artistic value score (0-25 points)
|
196 |
+
art_keywords = ['masterful', 'professional', 'cinematic', 'dramatic', 'beautiful']
|
197 |
+
art_score = sum(5 for keyword in art_keywords if keyword in prompt.lower())
|
198 |
+
breakdown["artistic_value"] = min(25, art_score)
|
199 |
+
|
200 |
+
# Flux optimization score (0-20 points)
|
201 |
+
flux_score = 0
|
202 |
+
if any(camera in prompt for camera in FLUX_RULES["camera_configs"].values()):
|
203 |
+
flux_score += 10
|
204 |
+
if any(lighting in prompt for lighting in FLUX_RULES["lighting_enhancements"].values()):
|
205 |
+
flux_score += 10
|
206 |
+
breakdown["flux_optimization"] = flux_score
|
207 |
+
|
208 |
+
# Calculate total
|
209 |
+
total_score = sum(breakdown.values())
|
210 |
+
|
211 |
+
return total_score, breakdown
|
212 |
+
|
213 |
+
|
214 |
+
def get_score_grade(score: int) -> Dict[str, str]:
|
215 |
+
"""
|
216 |
+
Get grade information for a score
|
217 |
+
|
218 |
+
Args:
|
219 |
+
score: Numeric score
|
220 |
+
|
221 |
+
Returns:
|
222 |
+
Dictionary with grade and color information
|
223 |
+
"""
|
224 |
+
from config import SCORING_CONFIG
|
225 |
+
|
226 |
+
for threshold, grade_info in sorted(SCORING_CONFIG["grade_thresholds"].items(), reverse=True):
|
227 |
+
if score >= threshold:
|
228 |
+
return grade_info
|
229 |
+
|
230 |
+
# Default to lowest grade
|
231 |
+
return SCORING_CONFIG["grade_thresholds"][0]
|
232 |
+
|
233 |
+
|
234 |
+
def format_analysis_report(analysis_data: Dict[str, Any], processing_time: float) -> str:
|
235 |
+
"""
|
236 |
+
Format analysis data into a readable report
|
237 |
+
|
238 |
+
Args:
|
239 |
+
analysis_data: Analysis results
|
240 |
+
processing_time: Time taken for processing
|
241 |
+
|
242 |
+
Returns:
|
243 |
+
Formatted markdown report
|
244 |
+
"""
|
245 |
+
model_used = analysis_data.get("model_used", "Unknown")
|
246 |
+
prompt_length = len(analysis_data.get("prompt", ""))
|
247 |
+
|
248 |
+
report = f"""**🚀 FLUX OPTIMIZATION COMPLETE**
|
249 |
+
**Model:** {model_used} • **Time:** {processing_time:.1f}s • **Length:** {prompt_length} chars
|
250 |
+
|
251 |
+
**📊 ANALYSIS SUMMARY:**
|
252 |
+
{analysis_data.get("summary", "Analysis completed successfully")}
|
253 |
+
|
254 |
+
**🎯 OPTIMIZATIONS APPLIED:**
|
255 |
+
✅ Flux camera configuration
|
256 |
+
✅ Professional lighting setup
|
257 |
+
✅ Technical photography details
|
258 |
+
✅ Artistic enhancement keywords
|
259 |
+
|
260 |
+
**⚡ Powered by Pariente AI Research**"""
|
261 |
+
|
262 |
+
return report
|
263 |
+
|
264 |
+
|
265 |
+
def safe_execute(func, *args, **kwargs) -> Tuple[bool, Any]:
|
266 |
+
"""
|
267 |
+
Safely execute a function with error handling
|
268 |
+
|
269 |
+
Args:
|
270 |
+
func: Function to execute
|
271 |
+
*args: Function arguments
|
272 |
+
**kwargs: Function keyword arguments
|
273 |
+
|
274 |
+
Returns:
|
275 |
+
Tuple of (success: bool, result: Any)
|
276 |
+
"""
|
277 |
+
try:
|
278 |
+
result = func(*args, **kwargs)
|
279 |
+
return True, result
|
280 |
+
except Exception as e:
|
281 |
+
logger.error(f"Safe execution failed for {func.__name__}: {e}")
|
282 |
+
return False, str(e)
|
283 |
+
|
284 |
+
|
285 |
+
def truncate_text(text: str, max_length: int = 100) -> str:
|
286 |
+
"""
|
287 |
+
Truncate text to specified length with ellipsis
|
288 |
+
|
289 |
+
Args:
|
290 |
+
text: Text to truncate
|
291 |
+
max_length: Maximum length
|
292 |
+
|
293 |
+
Returns:
|
294 |
+
Truncated text
|
295 |
+
"""
|
296 |
+
if not text or len(text) <= max_length:
|
297 |
+
return text
|
298 |
+
|
299 |
+
return text[:max_length-3] + "..."
|
300 |
+
|
301 |
+
|
302 |
+
# Export main functions
|
303 |
+
__all__ = [
|
304 |
+
"setup_logging",
|
305 |
+
"optimize_image",
|
306 |
+
"validate_image",
|
307 |
+
"clean_memory",
|
308 |
+
"apply_flux_rules",
|
309 |
+
"calculate_prompt_score",
|
310 |
+
"get_score_grade",
|
311 |
+
"format_analysis_report",
|
312 |
+
"safe_execute",
|
313 |
+
"truncate_text"
|
314 |
+
]
|