Spaces:
Sleeping
Sleeping
import json | |
import re | |
from PIL import Image | |
from PIL.PngImagePlugin import PngInfo | |
import logging | |
logger = logging.getLogger(__name__) | |
def extract_png_metadata(image_path: str) -> dict: | |
""" | |
Extract metadata from PNG files generated by A1111 or ComfyUI | |
Args: | |
image_path: Path to the PNG image file | |
Returns: | |
Dictionary containing extracted metadata | |
""" | |
try: | |
with Image.open(image_path) as img: | |
metadata = {} | |
# Check for A1111 metadata | |
a1111_data = extract_a1111_metadata(img) | |
if a1111_data: | |
metadata.update(a1111_data) | |
metadata['source'] = 'automatic1111' | |
# Check for ComfyUI metadata | |
comfyui_data = extract_comfyui_metadata(img) | |
if comfyui_data: | |
metadata.update(comfyui_data) | |
metadata['source'] = 'comfyui' | |
# Check for other common metadata fields | |
other_data = extract_other_metadata(img) | |
if other_data: | |
metadata.update(other_data) | |
return metadata if metadata else None | |
except Exception as e: | |
logger.error(f"Error extracting metadata from {image_path}: {str(e)}") | |
return None | |
def extract_a1111_metadata(img: Image.Image) -> dict: | |
"""Extract Automatic1111 metadata from PNG text fields""" | |
try: | |
metadata = {} | |
# A1111 stores metadata in the 'parameters' text field | |
if hasattr(img, 'text') and 'parameters' in img.text: | |
parameters_text = img.text['parameters'] | |
metadata.update(parse_a1111_parameters(parameters_text)) | |
return metadata | |
except Exception as e: | |
logger.error(f"Error extracting A1111 metadata: {str(e)}") | |
return {} | |
def parse_a1111_parameters(parameters_text: str) -> dict: | |
"""Parse A1111 parameters text into structured data""" | |
try: | |
metadata = {} | |
# Split the parameters text into lines | |
lines = parameters_text.strip().split('\n') | |
# The first line is usually the prompt | |
if lines: | |
metadata['prompt'] = lines[0].strip() | |
# Look for negative prompt | |
negative_prompt_match = re.search(r'Negative prompt:\s*(.+?)(?:\n|$)', parameters_text, re.DOTALL) | |
if negative_prompt_match: | |
metadata['negative_prompt'] = negative_prompt_match.group(1).strip() | |
# Extract other parameters using regex | |
param_patterns = { | |
'steps': r'Steps:\s*(\d+)', | |
'sampler': r'Sampler:\s*([^,\n]+)', | |
'cfg_scale': r'CFG scale:\s*([\d.]+)', | |
'seed': r'Seed:\s*(\d+)', | |
'size': r'Size:\s*(\d+x\d+)', | |
'model_hash': r'Model hash:\s*([a-fA-F0-9]+)', | |
'model': r'Model:\s*([^,\n]+)', | |
'denoising_strength': r'Denoising strength:\s*([\d.]+)', | |
'clip_skip': r'Clip skip:\s*(\d+)', | |
'ensd': r'ENSD:\s*(\d+)' | |
} | |
for param_name, pattern in param_patterns.items(): | |
match = re.search(pattern, parameters_text) | |
if match: | |
value = match.group(1).strip() | |
# Convert numeric values | |
if param_name in ['steps', 'seed', 'clip_skip', 'ensd']: | |
metadata[param_name] = int(value) | |
elif param_name in ['cfg_scale', 'denoising_strength']: | |
metadata[param_name] = float(value) | |
else: | |
metadata[param_name] = value | |
# Parse size into width and height | |
if 'size' in metadata: | |
size_match = re.match(r'(\d+)x(\d+)', metadata['size']) | |
if size_match: | |
metadata['width'] = int(size_match.group(1)) | |
metadata['height'] = int(size_match.group(2)) | |
return metadata | |
except Exception as e: | |
logger.error(f"Error parsing A1111 parameters: {str(e)}") | |
return {} | |
def extract_comfyui_metadata(img: Image.Image) -> dict: | |
"""Extract ComfyUI metadata from PNG text fields""" | |
try: | |
metadata = {} | |
# ComfyUI stores metadata in 'workflow' and 'prompt' text fields | |
if hasattr(img, 'text'): | |
# Check for workflow data | |
if 'workflow' in img.text: | |
try: | |
workflow_data = json.loads(img.text['workflow']) | |
metadata.update(parse_comfyui_workflow(workflow_data)) | |
except json.JSONDecodeError: | |
logger.warning("Could not parse ComfyUI workflow JSON") | |
# Check for prompt data | |
if 'prompt' in img.text: | |
try: | |
prompt_data = json.loads(img.text['prompt']) | |
metadata.update(parse_comfyui_prompt(prompt_data)) | |
except json.JSONDecodeError: | |
logger.warning("Could not parse ComfyUI prompt JSON") | |
return metadata | |
except Exception as e: | |
logger.error(f"Error extracting ComfyUI metadata: {str(e)}") | |
return {} | |
def parse_comfyui_workflow(workflow_data: dict) -> dict: | |
"""Parse ComfyUI workflow data""" | |
try: | |
metadata = {} | |
# Extract nodes from workflow | |
if 'nodes' in workflow_data: | |
nodes = workflow_data['nodes'] | |
# Look for common node types | |
for node in nodes: | |
if isinstance(node, dict): | |
node_type = node.get('type', '') | |
# Extract prompt from text nodes | |
if 'text' in node_type.lower() or 'prompt' in node_type.lower(): | |
if 'widgets_values' in node and node['widgets_values']: | |
text_value = node['widgets_values'][0] | |
if isinstance(text_value, str) and len(text_value) > 10: | |
if 'prompt' not in metadata: | |
metadata['prompt'] = text_value | |
# Extract sampler settings | |
elif 'sampler' in node_type.lower(): | |
if 'widgets_values' in node: | |
values = node['widgets_values'] | |
if len(values) >= 3: | |
metadata['steps'] = values[0] if isinstance(values[0], int) else None | |
metadata['cfg_scale'] = values[1] if isinstance(values[1], (int, float)) else None | |
metadata['sampler'] = values[2] if isinstance(values[2], str) else None | |
return metadata | |
except Exception as e: | |
logger.error(f"Error parsing ComfyUI workflow: {str(e)}") | |
return {} | |
def parse_comfyui_prompt(prompt_data: dict) -> dict: | |
"""Parse ComfyUI prompt data""" | |
try: | |
metadata = {} | |
# ComfyUI prompt data is usually a nested structure | |
# Extract common parameters from the prompt structure | |
for node_id, node_data in prompt_data.items(): | |
if isinstance(node_data, dict) and 'inputs' in node_data: | |
inputs = node_data['inputs'] | |
# Look for text inputs (prompts) | |
for key, value in inputs.items(): | |
if isinstance(value, str) and len(value) > 10: | |
if 'text' in key.lower() or 'prompt' in key.lower(): | |
if 'prompt' not in metadata: | |
metadata['prompt'] = value | |
# Look for numeric parameters | |
if 'steps' in inputs: | |
metadata['steps'] = inputs['steps'] | |
if 'cfg' in inputs: | |
metadata['cfg_scale'] = inputs['cfg'] | |
if 'seed' in inputs: | |
metadata['seed'] = inputs['seed'] | |
if 'denoise' in inputs: | |
metadata['denoising_strength'] = inputs['denoise'] | |
return metadata | |
except Exception as e: | |
logger.error(f"Error parsing ComfyUI prompt: {str(e)}") | |
return {} | |
def extract_other_metadata(img: Image.Image) -> dict: | |
"""Extract other common metadata fields""" | |
try: | |
metadata = {} | |
# Check standard EXIF data | |
if hasattr(img, '_getexif') and img._getexif(): | |
exif_data = img._getexif() | |
# Extract relevant EXIF fields | |
exif_fields = { | |
'software': 0x0131, # Software tag | |
'artist': 0x013B, # Artist tag | |
'copyright': 0x8298 # Copyright tag | |
} | |
for field_name, tag_id in exif_fields.items(): | |
if tag_id in exif_data: | |
metadata[field_name] = exif_data[tag_id] | |
# Check for other text fields that might contain prompts | |
if hasattr(img, 'text'): | |
text_fields = ['description', 'comment', 'title', 'subject'] | |
for field in text_fields: | |
if field in img.text: | |
value = img.text[field].strip() | |
if len(value) > 10 and 'prompt' not in metadata: | |
metadata['prompt'] = value | |
return metadata | |
except Exception as e: | |
logger.error(f"Error extracting other metadata: {str(e)}") | |
return {} | |
def clean_prompt_text(prompt: str) -> str: | |
"""Clean and normalize prompt text""" | |
try: | |
if not prompt: | |
return "" | |
# Remove extra whitespace | |
prompt = re.sub(r'\s+', ' ', prompt.strip()) | |
# Remove common prefixes/suffixes | |
prefixes_to_remove = [ | |
'prompt:', 'positive prompt:', 'text prompt:', | |
'description:', 'caption:' | |
] | |
for prefix in prefixes_to_remove: | |
if prompt.lower().startswith(prefix): | |
prompt = prompt[len(prefix):].strip() | |
return prompt | |
except Exception: | |
return prompt if prompt else "" | |
def get_generation_parameters(metadata: dict) -> dict: | |
"""Extract key generation parameters for display""" | |
try: | |
params = {} | |
# Essential parameters | |
if 'prompt' in metadata: | |
params['prompt'] = clean_prompt_text(metadata['prompt']) | |
if 'negative_prompt' in metadata: | |
params['negative_prompt'] = clean_prompt_text(metadata['negative_prompt']) | |
# Technical parameters | |
technical_params = ['steps', 'cfg_scale', 'sampler', 'seed', 'model', 'width', 'height'] | |
for param in technical_params: | |
if param in metadata: | |
params[param] = metadata[param] | |
# Source information | |
if 'source' in metadata: | |
params['source'] = metadata['source'] | |
return params | |
except Exception as e: | |
logger.error(f"Error extracting generation parameters: {str(e)}") | |
return {} | |