awacke1's picture
Update app.py
01d7524 verified
raw
history blame
35.5 kB
#!/usr/bin/env python3
import os
import glob
import base64
import streamlit as st
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import csv
import time
from dataclasses import dataclass
from typing import Optional
import zipfile
import math
from PIL import Image
import random
import logging
import numpy as np
import cv2
from diffusers import DiffusionPipeline
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
log_records = []
class LogCaptureHandler(logging.Handler):
def emit(self, record):
log_records.append(record)
logger.addHandler(LogCaptureHandler())
st.set_page_config(
page_title="SFT Tiny Titans 🚀",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded",
menu_items={
'Get Help': 'https://huggingface.co/awacke1',
'Report a Bug': 'https://huggingface.co/spaces/awacke1',
'About': "Tiny Titans: Small diffusion models, big CV dreams! 🌌"
}
)
if 'captured_images' not in st.session_state:
st.session_state['captured_images'] = []
if 'cv_builder' not in st.session_state:
st.session_state['cv_builder'] = None
if 'cv_loaded' not in st.session_state:
st.session_state['cv_loaded'] = False
if 'active_tab' not in st.session_state:
st.session_state['active_tab'] = "Build Titan 🌱"
@dataclass
class DiffusionConfig:
"""Config for our diffusion heroes 🦸‍♂️ - Keeps the blueprint snappy!"""
name: str
base_model: str
size: str
@property
def model_path(self):
return f"diffusion_models/{self.name}"
class DiffusionDataset(Dataset):
"""Pixel party platter 🍕 - Images and text for diffusion delight!"""
def __init__(self, images, texts):
self.images = images
self.texts = texts
def __len__(self):
return len(self.images)
def __getitem__(self, idx):
return {"image": self.images[idx], "text": self.texts[idx]}
class MicroDiffusionBuilder:
"""Tiny titan of diffusion 🐣 - Small but mighty for quick demos!"""
def __init__(self):
self.config = None
self.pipeline = None
self.jokes = ["Micro but mighty! 💪", "Small pixels, big dreams! 🌟"]
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None):
try:
with st.spinner(f"Loading {model_path}... ⏳ (Tiny titan powering up!)"):
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True)
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu")
if config:
self.config = config
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}")
logger.info(f"Loaded Micro Diffusion: {model_path}")
except Exception as e:
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Tiny titan tripped!)")
logger.error(f"Failed to load {model_path}: {str(e)}")
raise
return self
def fine_tune_sft(self, images, texts, epochs=3):
try:
dataset = DiffusionDataset(images, texts)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5)
self.pipeline.unet.train()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for epoch in range(epochs):
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Micro titan flexing!)"):
total_loss = 0
for batch in dataloader:
optimizer.zero_grad()
image = batch["image"][0].to(device)
text = batch["text"][0]
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample()
noise = torch.randn_like(latents)
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device)
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps)
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0]
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample
loss = torch.nn.functional.mse_loss(pred_noise, noise)
loss.backward()
optimizer.step()
total_loss += loss.item()
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}")
st.success(f"Micro Diffusion tuned! 🎉 {random.choice(self.jokes)}")
logger.info(f"Fine-tuned Micro Diffusion: {self.config.name}")
except Exception as e:
st.error(f"Tuning failed: {str(e)} 💥 (Micro snag!)")
logger.error(f"Tuning failed: {str(e)}")
raise
return self
def save_model(self, path: str):
try:
with st.spinner("Saving model... 💾 (Packing tiny pixels!)"):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.pipeline.save_pretrained(path)
st.success(f"Saved at {path}! ✅ Tiny titan secured!")
logger.info(f"Saved at {path}")
except Exception as e:
st.error(f"Save failed: {str(e)} 💥 (Packing mishap!)")
logger.error(f"Save failed: {str(e)}")
raise
def generate(self, prompt: str):
try:
return self.pipeline(prompt, num_inference_steps=20).images[0]
except Exception as e:
st.error(f"Generation failed: {str(e)} 💥 (Pixel oopsie!)")
logger.error(f"Generation failed: {str(e)}")
raise
class LatentDiffusionBuilder:
"""Scaled-down dreamer 🌙 - Latent magic for efficient artistry!"""
def __init__(self):
self.config = None
self.pipeline = None
self.jokes = ["Latent vibes only! 🌀", "Small scale, big style! 🎨"]
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None):
try:
with st.spinner(f"Loading {model_path}... ⏳ (Latent titan rising!)"):
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True)
self.pipeline.unet = torch.nn.Sequential(*list(self.pipeline.unet.children())[:2]) # Scale down U-Net
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu")
if config:
self.config = config
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}")
logger.info(f"Loaded Latent Diffusion: {model_path}")
except Exception as e:
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Latent hiccup!)")
logger.error(f"Failed to load {model_path}: {str(e)}")
raise
return self
def fine_tune_sft(self, images, texts, epochs=3):
try:
dataset = DiffusionDataset(images, texts)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5)
self.pipeline.unet.train()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for epoch in range(epochs):
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Latent titan shaping up!)"):
total_loss = 0
for batch in dataloader:
optimizer.zero_grad()
image = batch["image"][0].to(device)
text = batch["text"][0]
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample()
noise = torch.randn_like(latents)
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device)
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps)
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0]
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample
loss = torch.nn.functional.mse_loss(pred_noise, noise)
loss.backward()
optimizer.step()
total_loss += loss.item()
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}")
st.success(f"Latent Diffusion tuned! 🎉 {random.choice(self.jokes)}")
logger.info(f"Fine-tuned Latent Diffusion: {self.config.name}")
except Exception as e:
st.error(f"Tuning failed: {str(e)} 💥 (Latent snag!)")
logger.error(f"Tuning failed: {str(e)}")
raise
return self
def save_model(self, path: str):
try:
with st.spinner("Saving model... 💾 (Packing latent dreams!)"):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.pipeline.save_pretrained(path)
st.success(f"Saved at {path}! ✅ Latent titan stashed!")
logger.info(f"Saved at {path}")
except Exception as e:
st.error(f"Save failed: {str(e)} 💥 (Dreamy mishap!)")
logger.error(f"Save failed: {str(e)}")
raise
def generate(self, prompt: str):
try:
return self.pipeline(prompt, num_inference_steps=30).images[0]
except Exception as e:
st.error(f"Generation failed: {str(e)} 💥 (Latent oopsie!)")
logger.error(f"Generation failed: {str(e)}")
raise
class FluxDiffusionBuilder:
"""Distilled dynamo ⚡ - High-quality pixels in a small package!"""
def __init__(self):
self.config = None
self.pipeline = None
self.jokes = ["Flux-tastic! ✨", "Small size, big wow! 🎇"]
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None):
try:
with st.spinner(f"Loading {model_path}... ⏳ (Flux titan charging!)"):
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True)
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu")
if config:
self.config = config
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}")
logger.info(f"Loaded FLUX.1 Distilled: {model_path}")
except Exception as e:
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Flux fizzle!)")
logger.error(f"Failed to load {model_path}: {str(e)}")
raise
return self
def fine_tune_sft(self, images, texts, epochs=3):
try:
dataset = DiffusionDataset(images, texts)
dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5)
self.pipeline.unet.train()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for epoch in range(epochs):
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Flux titan powering up!)"):
total_loss = 0
for batch in dataloader:
optimizer.zero_grad()
image = batch["image"][0].to(device)
text = batch["text"][0]
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample()
noise = torch.randn_like(latents)
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device)
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps)
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0]
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample
loss = torch.nn.functional.mse_loss(pred_noise, noise)
loss.backward()
optimizer.step()
total_loss += loss.item()
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}")
st.success(f"FLUX Diffusion tuned! 🎉 {random.choice(self.jokes)}")
logger.info(f"Fine-tuned FLUX.1 Distilled: {self.config.name}")
except Exception as e:
st.error(f"Tuning failed: {str(e)} 💥 (Flux snag!)")
logger.error(f"Tuning failed: {str(e)}")
raise
return self
def save_model(self, path: str):
try:
with st.spinner("Saving model... 💾 (Packing flux magic!)"):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.pipeline.save_pretrained(path)
st.success(f"Saved at {path}! ✅ Flux titan secured!")
logger.info(f"Saved at {path}")
except Exception as e:
st.error(f"Save failed: {str(e)} 💥 (Fluxy mishap!)")
logger.error(f"Save failed: {str(e)}")
raise
def generate(self, prompt: str):
try:
return self.pipeline(prompt, num_inference_steps=50).images[0]
except Exception as e:
st.error(f"Generation failed: {str(e)} 💥 (Flux oopsie!)")
logger.error(f"Generation failed: {str(e)}")
raise
def generate_filename(sequence, ext="png"):
"""Time-stamped snapshots ⏰ - Keeps our pics organized with cam flair!"""
from datetime import datetime
import pytz
central = pytz.timezone('US/Central')
dt = datetime.now(central)
return f"{dt.strftime('%m-%d-%Y-%I-%M-%S-%p')}-{sequence}.{ext}"
def get_download_link(file_path, mime_type="text/plain", label="Download"):
"""Magic link maker 🔗 - Snag your files with a click!"""
try:
with open(file_path, 'rb') as f:
data = f.read()
b64 = base64.b64encode(data).decode()
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label} 📥</a>'
except Exception as e:
logger.error(f"Failed to generate link for {file_path}: {str(e)}")
return f"Error: Could not generate link for {file_path}"
def zip_files(files, zip_path):
"""Zip zap zoo 🎒 - Bundle up your goodies!"""
try:
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for file in files:
zipf.write(file, os.path.basename(file))
logger.info(f"Created ZIP file: {zip_path}")
except Exception as e:
logger.error(f"Failed to create ZIP {zip_path}: {str(e)}")
raise
def delete_files(files):
"""Trash titan 🗑️ - Clear the stage for new stars!"""
try:
for file in files:
os.remove(file)
logger.info(f"Deleted file: {file}")
st.session_state['captured_images'] = [f for f in st.session_state['captured_images'] if f not in files]
except Exception as e:
logger.error(f"Failed to delete files: {str(e)}")
raise
def get_model_files():
"""Model treasure hunt 🗺️ - Find our diffusion gems!"""
return [d for d in glob.glob("diffusion_models/*") if os.path.isdir(d)]
def get_gallery_files(file_types):
"""Gallery curator 🖼️ - Showcase our pixel masterpieces!"""
return sorted(list(set(f for ext in file_types for f in glob.glob(f"*.{ext}"))))
def update_gallery():
"""Gallery refresh 🌟 - Keep the art flowing!"""
media_files = get_gallery_files(["png"])
if media_files:
cols = st.sidebar.columns(2)
for idx, file in enumerate(media_files[:gallery_size * 2]):
with cols[idx % 2]:
st.image(Image.open(file), caption=file, use_container_width=True)
st.markdown(get_download_link(file, "image/png", "Download Snap 📸"), unsafe_allow_html=True)
def get_available_video_devices():
"""Camera roll call 🎥 - Who’s ready to shine? Fallback if OpenCV flops!"""
video_devices = [f"Camera {i} 🎥" for i in range(6)] # Default to 6 cams
try:
detected = []
for i in range(6): # Limit to 6 as per your setup
cap = cv2.VideoCapture(i, cv2.CAP_V4L2)
if not cap.isOpened():
cap = cv2.VideoCapture(i)
if cap.isOpened():
detected.append(f"Camera {i} 🎥")
logger.info(f"Detected camera at index {i}")
cap.release()
if detected:
video_devices = detected
else:
logger.warning("No cameras detected by OpenCV; using defaults")
except Exception as e:
logger.error(f"Error detecting cameras: {str(e)} - Falling back to defaults")
return video_devices
st.title("SFT Tiny Titans 🚀 (Small Diffusion Delight!)")
st.sidebar.header("Media Gallery 🎨")
gallery_size = st.sidebar.slider("Gallery Size 📸", 1, 10, 4, help="How many snaps to flaunt? 🌟")
update_gallery()
col1, col2 = st.sidebar.columns(2)
with col1:
if st.button("Download All 📦"):
media_files = get_gallery_files(["png"])
if media_files:
zip_path = f"snapshot_collection_{int(time.time())}.zip"
zip_files(media_files, zip_path)
st.sidebar.markdown(get_download_link(zip_path, "application/zip", "Download All Snaps 📦"), unsafe_allow_html=True)
st.sidebar.success("Snaps zipped! 🎉 Grab your loot!")
else:
st.sidebar.warning("No snaps to zip! 📸 Snap some first!")
with col2:
if st.button("Delete All 🗑️"):
media_files = get_gallery_files(["png"])
if media_files:
delete_files(media_files)
st.sidebar.success("Snaps vanquished! 🧹 Gallery cleared!")
update_gallery()
else:
st.sidebar.warning("Nothing to delete! 📸 Snap some pics!")
uploaded_files = st.sidebar.file_uploader("Upload Goodies 🎵🎥🖼️📝📜", type=["mp3", "mp4", "png", "jpeg", "md", "pdf", "docx"], accept_multiple_files=True)
if uploaded_files:
for uploaded_file in uploaded_files:
filename = uploaded_file.name
with open(filename, "wb") as f:
f.write(uploaded_file.getvalue())
logger.info(f"Uploaded file: {filename}")
st.sidebar.subheader("Image Gallery 🖼️")
image_files = get_gallery_files(["png", "jpeg"])
if image_files:
cols = st.sidebar.columns(2)
for idx, file in enumerate(image_files[:gallery_size * 2]):
with cols[idx % 2]:
st.image(Image.open(file), caption=file, use_container_width=True)
st.markdown(get_download_link(file, "image/png" if file.endswith(".png") else "image/jpeg", f"Save Pic 🖼️"), unsafe_allow_html=True)
st.sidebar.subheader("Model Management 🗂️")
model_dirs = get_model_files()
selected_model = st.sidebar.selectbox("Select Saved Model", ["None"] + model_dirs)
model_type = st.sidebar.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"])
if selected_model != "None" and st.sidebar.button("Load Model 📂"):
builder = {
"Micro Diffusion": MicroDiffusionBuilder,
"Latent Diffusion": LatentDiffusionBuilder,
"FLUX.1 Distilled": FluxDiffusionBuilder
}[model_type]()
config = DiffusionConfig(name=os.path.basename(selected_model), base_model="unknown", size="small")
try:
builder.load_model(selected_model, config)
st.session_state['cv_builder'] = builder
st.session_state['cv_loaded'] = True
st.rerun()
except Exception as e:
st.error(f"Model load failed: {str(e)} 💥 (Check logs for details!)")
st.sidebar.subheader("Model Status 🚦")
st.sidebar.write(f"**CV Model**: {'Loaded' if st.session_state['cv_loaded'] else 'Not Loaded'} {'(Active)' if st.session_state['cv_loaded'] and isinstance(st.session_state.get('cv_builder'), (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)) else ''}")
tabs = ["Build Titan 🌱", "Camera Snap 📷", "Fine-Tune Titan (CV) 🔧", "Test Titan (CV) 🧪", "Agentic RAG Party (CV) 🌐"]
tab1, tab2, tab3, tab4, tab5 = st.tabs(tabs)
for i, tab in enumerate(tabs):
if st.session_state['active_tab'] != tab and st.session_state.get(f'tab{i}_active', False):
logger.info(f"Switched to tab: {tab}")
st.session_state['active_tab'] = tab
st.session_state[f'tab{i}_active'] = (st.session_state['active_tab'] == tab)
with tab1:
st.header("Build Titan 🌱")
model_type = st.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"], key="build_type")
base_model = st.selectbox("Select Tiny Model",
["CompVis/ldm-text2im-large-256" if model_type == "Micro Diffusion" else "runwayml/stable-diffusion-v1-5" if model_type == "Latent Diffusion" else "black-forest-labs/flux.1-distilled"])
model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}")
if st.button("Download Model ⬇️"):
config = DiffusionConfig(name=model_name, base_model=base_model, size="small")
builder = {
"Micro Diffusion": MicroDiffusionBuilder,
"Latent Diffusion": LatentDiffusionBuilder,
"FLUX.1 Distilled": FluxDiffusionBuilder
}[model_type]()
try:
builder.load_model(base_model, config)
builder.save_model(config.model_path)
st.session_state['cv_builder'] = builder
st.session_state['cv_loaded'] = True
st.rerun()
except Exception as e:
st.error(f"Model build failed: {str(e)} 💥 (Check logs for details!)")
with tab2:
st.header("Camera Snap 📷 (Dual Capture Fiesta!)")
video_devices = get_available_video_devices()
st.write(f"🎉 Detected Cameras: {', '.join(video_devices)}")
st.info("Switch cams in your browser settings (e.g., Chrome > Privacy > Camera) since I’m a browser star! 🌟")
st.subheader("Camera 0 🎬 - Lights, Camera, Action!")
cam0_cols = st.columns(4)
with cam0_cols[0]:
cam0_device = st.selectbox("Cam 📷", video_devices, index=0, key="cam0_device", help="Pick your star cam! 🌟")
with cam0_cols[1]:
cam0_label = st.text_input("Tag 🏷️", "Cam 0 Snap", key="cam0_label", help="Name your masterpiece! 🎨")
with cam0_cols[2]:
cam0_help = st.text_input("Hint 💡", "Snap a heroic moment! 🦸‍♂️", key="cam0_help", help="Give a fun tip!")
with cam0_cols[3]:
cam0_vis = st.selectbox("Show 🖼️", ["visible", "hidden", "collapsed"], index=0, key="cam0_vis", help="Label vibes: Visible, Sneaky, or Gone!")
st.subheader("Camera 1 🎥 - Roll the Film!")
cam1_cols = st.columns(4)
with cam1_cols[0]:
cam1_device = st.selectbox("Cam 📷", video_devices, index=1 if len(video_devices) > 1 else 0, key="cam1_device", help="Choose your blockbuster cam! 🎬")
with cam1_cols[1]:
cam1_label = st.text_input("Tag 🏷️", "Cam 1 Snap", key="cam1_label", help="Title your epic shot! 🌠")
with cam1_cols[2]:
cam1_help = st.text_input("Hint 💡", "Grab an epic frame! 🌟", key="cam1_help", help="Drop a cheeky hint!")
with cam1_cols[3]:
cam1_vis = st.selectbox("Show 🖼️", ["visible", "hidden", "collapsed"], index=0, key="cam1_vis", help="Label style: Show it, Hide it, Poof!")
cols = st.columns(2)
with cols[0]:
st.subheader(f"Camera 0 ({cam0_device}) 🎬")
cam0_img = st.camera_input(
label=cam0_label,
key="cam0",
help=cam0_help,
disabled=False,
label_visibility=cam0_vis
)
if cam0_img:
filename = generate_filename("cam0")
with open(filename, "wb") as f:
f.write(cam0_img.getvalue())
st.image(Image.open(filename), caption=filename, use_container_width=True)
logger.info(f"Saved snapshot from Camera 0: {filename}")
st.session_state['captured_images'].append(filename)
update_gallery()
st.info("🚨 One snap at a time—your Titan’s too cool for bursts! 😎")
with cols[1]:
st.subheader(f"Camera 1 ({cam1_device}) 🎥")
cam1_img = st.camera_input(
label=cam1_label,
key="cam1",
help=cam1_help,
disabled=False,
label_visibility=cam1_vis
)
if cam1_img:
filename = generate_filename("cam1")
with open(filename, "wb") as f:
f.write(cam1_img.getvalue())
st.image(Image.open(filename), caption=filename, use_container_width=True)
logger.info(f"Saved snapshot from Camera 1: {filename}")
st.session_state['captured_images'].append(filename)
update_gallery()
st.info("🚨 Single shots only—craft your masterpiece! 🎨")
with tab3:
st.header("Fine-Tune Titan (CV) 🔧 (Sculpt Your Pixel Prodigy!)")
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)):
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no canvas!)")
else:
captured_images = get_gallery_files(["png"])
if len(captured_images) >= 2:
st.subheader("Use Case 1: Denoise Snapshots 🌟")
denoising_data = [{"image": img, "text": f"Denoised {os.path.basename(img).split('-')[4]} snap"} for img in captured_images[:min(len(captured_images), 10)]]
denoising_edited = st.data_editor(pd.DataFrame(denoising_data), num_rows="dynamic", help="Craft denoising pairs! 🌟")
if st.button("Fine-Tune Denoising 🔄"):
images = [Image.open(row["image"]) for _, row in denoising_edited.iterrows()]
texts = [row["text"] for _, row in denoising_edited.iterrows()]
new_model_name = f"{st.session_state['cv_builder'].config.name}-denoise-{int(time.time())}"
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small")
st.session_state['cv_builder'].config = new_config
with st.status("Fine-tuning for denoising... ⏳ (Polishing pixels!)", expanded=True) as status:
st.session_state['cv_builder'].fine_tune_sft(images, texts)
st.session_state['cv_builder'].save_model(new_config.model_path)
status.update(label="Denoising tuned! 🎉 (Pixel shine unleashed!)", state="complete")
zip_path = f"{new_config.model_path}.zip"
zip_files([new_config.model_path], zip_path)
st.markdown(get_download_link(zip_path, "application/zip", "Download Denoised Titan 📦"), unsafe_allow_html=True)
denoising_csv = f"denoise_dataset_{int(time.time())}.csv"
with open(denoising_csv, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["image", "text"])
for _, row in denoising_edited.iterrows():
writer.writerow([row["image"], row["text"]])
st.markdown(get_download_link(denoising_csv, "text/csv", "Download Denoising CSV 📜"), unsafe_allow_html=True)
st.subheader("Use Case 2: Stylize Snapshots 🎨")
stylize_data = [{"image": img, "text": f"Neon {os.path.basename(img).split('-')[4]} style"} for img in captured_images[:min(len(captured_images), 10)]]
stylize_edited = st.data_editor(pd.DataFrame(stylize_data), num_rows="dynamic", help="Craft stylized pairs! 🎨")
if st.button("Fine-Tune Stylization 🔄"):
images = [Image.open(row["image"]) for _, row in stylize_edited.iterrows()]
texts = [row["text"] for _, row in stylize_edited.iterrows()]
new_model_name = f"{st.session_state['cv_builder'].config.name}-stylize-{int(time.time())}"
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small")
st.session_state['cv_builder'].config = new_config
with st.status("Fine-tuning for stylization... ⏳ (Painting pixels!)", expanded=True) as status:
st.session_state['cv_builder'].fine_tune_sft(images, texts)
st.session_state['cv_builder'].save_model(new_config.model_path)
status.update(label="Stylization tuned! 🎉 (Pixel art unleashed!)", state="complete")
zip_path = f"{new_config.model_path}.zip"
zip_files([new_config.model_path], zip_path)
st.markdown(get_download_link(zip_path, "application/zip", "Download Stylized Titan 📦"), unsafe_allow_html=True)
stylize_md = f"stylize_dataset_{int(time.time())}.md"
with open(stylize_md, "w") as f:
f.write("# Stylization Dataset\n\n")
for _, row in stylize_edited.iterrows():
f.write(f"- `{row['image']}`: {row['text']}\n")
st.markdown(get_download_link(stylize_md, "text/markdown", "Download Stylization MD 📝"), unsafe_allow_html=True)
st.subheader("Use Case 3: Multi-Angle Snapshots 🌐")
multiangle_data = [{"image": img, "text": f"View from {os.path.basename(img).split('-')[4]}"} for img in captured_images[:min(len(captured_images), 10)]]
multiangle_edited = st.data_editor(pd.DataFrame(multiangle_data), num_rows="dynamic", help="Craft multi-angle pairs! 🌐")
if st.button("Fine-Tune Multi-Angle 🔄"):
images = [Image.open(row["image"]) for _, row in multiangle_edited.iterrows()]
texts = [row["text"] for _, row in multiangle_edited.iterrows()]
new_model_name = f"{st.session_state['cv_builder'].config.name}-multiangle-{int(time.time())}"
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small")
st.session_state['cv_builder'].config = new_config
with st.status("Fine-tuning for multi-angle... ⏳ (Spinning pixels!)", expanded=True) as status:
st.session_state['cv_builder'].fine_tune_sft(images, texts)
st.session_state['cv_builder'].save_model(new_config.model_path)
status.update(label="Multi-angle tuned! 🎉 (Pixel views unleashed!)", state="complete")
zip_path = f"{new_config.model_path}.zip"
zip_files([new_config.model_path], zip_path)
st.markdown(get_download_link(zip_path, "application/zip", "Download Multi-Angle Titan 📦"), unsafe_allow_html=True)
multiangle_csv = f"multiangle_dataset_{int(time.time())}.csv"
with open(multiangle_csv, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["image", "text"])
for _, row in multiangle_edited.iterrows():
writer.writerow([row["image"], row["text"]])
st.markdown(get_download_link(multiangle_csv, "text/csv", "Download Multi-Angle CSV 📜"), unsafe_allow_html=True)
with tab4:
st.header("Test Titan (CV) 🧪 (Unleash Your Pixel Power!)")
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)):
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no masterpiece!)")
else:
st.subheader("Test Your Titan 🎨")
test_prompt = st.text_area("Prompt 🎤", "Neon glow from cam0", help="Dream up a wild image—your Titan’s ready to paint! 🖌️")
if st.button("Generate ▶️"):
with st.spinner("Crafting your masterpiece... ⏳ (Titan’s mixing pixels!)"):
image = st.session_state['cv_builder'].generate(test_prompt)
st.image(image, caption=f"Generated: {test_prompt}", use_container_width=True)
with tab5:
st.header("Agentic RAG Party (CV) 🌐 (Pixel Party Extravaganza!)")
st.write("Generate superhero party vibes from your tuned Titan! 🎉")
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)):
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no party!)")
else:
if st.button("Run RAG Demo 🎉"):
with st.spinner("Loading your pixel party titan... ⏳ (Titan’s grabbing its brush!)"):
class CVPartyAgent:
def __init__(self, pipeline):
self.pipeline = pipeline
def generate(self, prompt: str) -> Image.Image:
return self.pipeline(prompt, num_inference_steps=50).images[0]
def plan_party(self):
prompts = [
"Gold-plated Batman statue from cam0",
"VR superhero battle scene from cam1",
"Neon-lit Avengers tower from cam2"
]
data = [{"Theme": f"Scene {i+1}", "Image Idea": prompt} for i, prompt in enumerate(prompts)]
return pd.DataFrame(data)
agent = CVPartyAgent(st.session_state['cv_builder'].pipeline)
st.write("Party agent ready! 🎨 (Time to paint an epic bash!)")
with st.spinner("Crafting superhero party visuals... ⏳ (Pixels assemble!)"):
try:
plan_df = agent.plan_party()
st.dataframe(plan_df)
for _, row in plan_df.iterrows():
image = agent.generate(row["Image Idea"])
st.image(image, caption=f"{row['Theme']} - {row['Image Idea']}", use_container_width=True)
except Exception as e:
st.error(f"Party crashed: {str(e)} 💥 (Pixel oopsie!)")
logger.error(f"RAG demo failed: {str(e)}")
st.sidebar.subheader("Action Logs 📜")
log_container = st.sidebar.empty()
with log_container:
for record in log_records:
st.write(f"{record.asctime} - {record.levelname} - {record.message}")
update_gallery()