|
|
|
import os |
|
import glob |
|
import base64 |
|
import streamlit as st |
|
import pandas as pd |
|
import torch |
|
from torch.utils.data import Dataset, DataLoader |
|
import csv |
|
import time |
|
from dataclasses import dataclass |
|
from typing import Optional |
|
import zipfile |
|
import math |
|
from PIL import Image |
|
import random |
|
import logging |
|
import numpy as np |
|
import cv2 |
|
from diffusers import DiffusionPipeline |
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
logger = logging.getLogger(__name__) |
|
log_records = [] |
|
|
|
class LogCaptureHandler(logging.Handler): |
|
def emit(self, record): |
|
log_records.append(record) |
|
|
|
logger.addHandler(LogCaptureHandler()) |
|
|
|
st.set_page_config( |
|
page_title="SFT Tiny Titans 🚀", |
|
page_icon="🤖", |
|
layout="wide", |
|
initial_sidebar_state="expanded", |
|
menu_items={ |
|
'Get Help': 'https://huggingface.co/awacke1', |
|
'Report a Bug': 'https://huggingface.co/spaces/awacke1', |
|
'About': "Tiny Titans: Small diffusion models, big CV dreams! 🌌" |
|
} |
|
) |
|
|
|
if 'captured_images' not in st.session_state: |
|
st.session_state['captured_images'] = [] |
|
if 'cv_builder' not in st.session_state: |
|
st.session_state['cv_builder'] = None |
|
if 'cv_loaded' not in st.session_state: |
|
st.session_state['cv_loaded'] = False |
|
if 'active_tab' not in st.session_state: |
|
st.session_state['active_tab'] = "Build Titan 🌱" |
|
|
|
@dataclass |
|
class DiffusionConfig: |
|
"""Config for our diffusion heroes 🦸♂️ - Keeps the blueprint snappy!""" |
|
name: str |
|
base_model: str |
|
size: str |
|
@property |
|
def model_path(self): |
|
return f"diffusion_models/{self.name}" |
|
|
|
class DiffusionDataset(Dataset): |
|
"""Pixel party platter 🍕 - Images and text for diffusion delight!""" |
|
def __init__(self, images, texts): |
|
self.images = images |
|
self.texts = texts |
|
def __len__(self): |
|
return len(self.images) |
|
def __getitem__(self, idx): |
|
return {"image": self.images[idx], "text": self.texts[idx]} |
|
|
|
class MicroDiffusionBuilder: |
|
"""Tiny titan of diffusion 🐣 - Small but mighty for quick demos!""" |
|
def __init__(self): |
|
self.config = None |
|
self.pipeline = None |
|
self.jokes = ["Micro but mighty! 💪", "Small pixels, big dreams! 🌟"] |
|
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): |
|
try: |
|
with st.spinner(f"Loading {model_path}... ⏳ (Tiny titan powering up!)"): |
|
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) |
|
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") |
|
if config: |
|
self.config = config |
|
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}") |
|
logger.info(f"Loaded Micro Diffusion: {model_path}") |
|
except Exception as e: |
|
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Tiny titan tripped!)") |
|
logger.error(f"Failed to load {model_path}: {str(e)}") |
|
raise |
|
return self |
|
def fine_tune_sft(self, images, texts, epochs=3): |
|
try: |
|
dataset = DiffusionDataset(images, texts) |
|
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) |
|
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) |
|
self.pipeline.unet.train() |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
for epoch in range(epochs): |
|
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Micro titan flexing!)"): |
|
total_loss = 0 |
|
for batch in dataloader: |
|
optimizer.zero_grad() |
|
image = batch["image"][0].to(device) |
|
text = batch["text"][0] |
|
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() |
|
noise = torch.randn_like(latents) |
|
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) |
|
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) |
|
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] |
|
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample |
|
loss = torch.nn.functional.mse_loss(pred_noise, noise) |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") |
|
st.success(f"Micro Diffusion tuned! 🎉 {random.choice(self.jokes)}") |
|
logger.info(f"Fine-tuned Micro Diffusion: {self.config.name}") |
|
except Exception as e: |
|
st.error(f"Tuning failed: {str(e)} 💥 (Micro snag!)") |
|
logger.error(f"Tuning failed: {str(e)}") |
|
raise |
|
return self |
|
def save_model(self, path: str): |
|
try: |
|
with st.spinner("Saving model... 💾 (Packing tiny pixels!)"): |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
self.pipeline.save_pretrained(path) |
|
st.success(f"Saved at {path}! ✅ Tiny titan secured!") |
|
logger.info(f"Saved at {path}") |
|
except Exception as e: |
|
st.error(f"Save failed: {str(e)} 💥 (Packing mishap!)") |
|
logger.error(f"Save failed: {str(e)}") |
|
raise |
|
def generate(self, prompt: str): |
|
try: |
|
return self.pipeline(prompt, num_inference_steps=20).images[0] |
|
except Exception as e: |
|
st.error(f"Generation failed: {str(e)} 💥 (Pixel oopsie!)") |
|
logger.error(f"Generation failed: {str(e)}") |
|
raise |
|
|
|
class LatentDiffusionBuilder: |
|
"""Scaled-down dreamer 🌙 - Latent magic for efficient artistry!""" |
|
def __init__(self): |
|
self.config = None |
|
self.pipeline = None |
|
self.jokes = ["Latent vibes only! 🌀", "Small scale, big style! 🎨"] |
|
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): |
|
try: |
|
with st.spinner(f"Loading {model_path}... ⏳ (Latent titan rising!)"): |
|
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) |
|
self.pipeline.unet = torch.nn.Sequential(*list(self.pipeline.unet.children())[:2]) |
|
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") |
|
if config: |
|
self.config = config |
|
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}") |
|
logger.info(f"Loaded Latent Diffusion: {model_path}") |
|
except Exception as e: |
|
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Latent hiccup!)") |
|
logger.error(f"Failed to load {model_path}: {str(e)}") |
|
raise |
|
return self |
|
def fine_tune_sft(self, images, texts, epochs=3): |
|
try: |
|
dataset = DiffusionDataset(images, texts) |
|
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) |
|
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) |
|
self.pipeline.unet.train() |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
for epoch in range(epochs): |
|
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Latent titan shaping up!)"): |
|
total_loss = 0 |
|
for batch in dataloader: |
|
optimizer.zero_grad() |
|
image = batch["image"][0].to(device) |
|
text = batch["text"][0] |
|
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() |
|
noise = torch.randn_like(latents) |
|
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) |
|
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) |
|
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] |
|
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample |
|
loss = torch.nn.functional.mse_loss(pred_noise, noise) |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") |
|
st.success(f"Latent Diffusion tuned! 🎉 {random.choice(self.jokes)}") |
|
logger.info(f"Fine-tuned Latent Diffusion: {self.config.name}") |
|
except Exception as e: |
|
st.error(f"Tuning failed: {str(e)} 💥 (Latent snag!)") |
|
logger.error(f"Tuning failed: {str(e)}") |
|
raise |
|
return self |
|
def save_model(self, path: str): |
|
try: |
|
with st.spinner("Saving model... 💾 (Packing latent dreams!)"): |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
self.pipeline.save_pretrained(path) |
|
st.success(f"Saved at {path}! ✅ Latent titan stashed!") |
|
logger.info(f"Saved at {path}") |
|
except Exception as e: |
|
st.error(f"Save failed: {str(e)} 💥 (Dreamy mishap!)") |
|
logger.error(f"Save failed: {str(e)}") |
|
raise |
|
def generate(self, prompt: str): |
|
try: |
|
return self.pipeline(prompt, num_inference_steps=30).images[0] |
|
except Exception as e: |
|
st.error(f"Generation failed: {str(e)} 💥 (Latent oopsie!)") |
|
logger.error(f"Generation failed: {str(e)}") |
|
raise |
|
|
|
class FluxDiffusionBuilder: |
|
"""Distilled dynamo ⚡ - High-quality pixels in a small package!""" |
|
def __init__(self): |
|
self.config = None |
|
self.pipeline = None |
|
self.jokes = ["Flux-tastic! ✨", "Small size, big wow! 🎇"] |
|
def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): |
|
try: |
|
with st.spinner(f"Loading {model_path}... ⏳ (Flux titan charging!)"): |
|
self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) |
|
self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") |
|
if config: |
|
self.config = config |
|
st.success(f"Model loaded! 🎉 {random.choice(self.jokes)}") |
|
logger.info(f"Loaded FLUX.1 Distilled: {model_path}") |
|
except Exception as e: |
|
st.error(f"Failed to load {model_path}: {str(e)} 💥 (Flux fizzle!)") |
|
logger.error(f"Failed to load {model_path}: {str(e)}") |
|
raise |
|
return self |
|
def fine_tune_sft(self, images, texts, epochs=3): |
|
try: |
|
dataset = DiffusionDataset(images, texts) |
|
dataloader = DataLoader(dataset, batch_size=1, shuffle=True) |
|
optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) |
|
self.pipeline.unet.train() |
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
for epoch in range(epochs): |
|
with st.spinner(f"Epoch {epoch + 1}/{epochs}... ⚙️ (Flux titan powering up!)"): |
|
total_loss = 0 |
|
for batch in dataloader: |
|
optimizer.zero_grad() |
|
image = batch["image"][0].to(device) |
|
text = batch["text"][0] |
|
latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() |
|
noise = torch.randn_like(latents) |
|
timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) |
|
noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) |
|
text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] |
|
pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample |
|
loss = torch.nn.functional.mse_loss(pred_noise, noise) |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") |
|
st.success(f"FLUX Diffusion tuned! 🎉 {random.choice(self.jokes)}") |
|
logger.info(f"Fine-tuned FLUX.1 Distilled: {self.config.name}") |
|
except Exception as e: |
|
st.error(f"Tuning failed: {str(e)} 💥 (Flux snag!)") |
|
logger.error(f"Tuning failed: {str(e)}") |
|
raise |
|
return self |
|
def save_model(self, path: str): |
|
try: |
|
with st.spinner("Saving model... 💾 (Packing flux magic!)"): |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
self.pipeline.save_pretrained(path) |
|
st.success(f"Saved at {path}! ✅ Flux titan secured!") |
|
logger.info(f"Saved at {path}") |
|
except Exception as e: |
|
st.error(f"Save failed: {str(e)} 💥 (Fluxy mishap!)") |
|
logger.error(f"Save failed: {str(e)}") |
|
raise |
|
def generate(self, prompt: str): |
|
try: |
|
return self.pipeline(prompt, num_inference_steps=50).images[0] |
|
except Exception as e: |
|
st.error(f"Generation failed: {str(e)} 💥 (Flux oopsie!)") |
|
logger.error(f"Generation failed: {str(e)}") |
|
raise |
|
|
|
def generate_filename(sequence, ext="png"): |
|
"""Time-stamped snapshots ⏰ - Keeps our pics organized with cam flair!""" |
|
from datetime import datetime |
|
import pytz |
|
central = pytz.timezone('US/Central') |
|
dt = datetime.now(central) |
|
return f"{dt.strftime('%m-%d-%Y-%I-%M-%S-%p')}-{sequence}.{ext}" |
|
|
|
def get_download_link(file_path, mime_type="text/plain", label="Download"): |
|
"""Magic link maker 🔗 - Snag your files with a click!""" |
|
try: |
|
with open(file_path, 'rb') as f: |
|
data = f.read() |
|
b64 = base64.b64encode(data).decode() |
|
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label} 📥</a>' |
|
except Exception as e: |
|
logger.error(f"Failed to generate link for {file_path}: {str(e)}") |
|
return f"Error: Could not generate link for {file_path}" |
|
|
|
def zip_files(files, zip_path): |
|
"""Zip zap zoo 🎒 - Bundle up your goodies!""" |
|
try: |
|
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: |
|
for file in files: |
|
zipf.write(file, os.path.basename(file)) |
|
logger.info(f"Created ZIP file: {zip_path}") |
|
except Exception as e: |
|
logger.error(f"Failed to create ZIP {zip_path}: {str(e)}") |
|
raise |
|
|
|
def delete_files(files): |
|
"""Trash titan 🗑️ - Clear the stage for new stars!""" |
|
try: |
|
for file in files: |
|
os.remove(file) |
|
logger.info(f"Deleted file: {file}") |
|
st.session_state['captured_images'] = [f for f in st.session_state['captured_images'] if f not in files] |
|
except Exception as e: |
|
logger.error(f"Failed to delete files: {str(e)}") |
|
raise |
|
|
|
def get_model_files(): |
|
"""Model treasure hunt 🗺️ - Find our diffusion gems!""" |
|
return [d for d in glob.glob("diffusion_models/*") if os.path.isdir(d)] |
|
|
|
def get_gallery_files(file_types): |
|
"""Gallery curator 🖼️ - Showcase our pixel masterpieces!""" |
|
return sorted(list(set(f for ext in file_types for f in glob.glob(f"*.{ext}")))) |
|
|
|
def update_gallery(): |
|
"""Gallery refresh 🌟 - Keep the art flowing!""" |
|
media_files = get_gallery_files(["png"]) |
|
if media_files: |
|
cols = st.sidebar.columns(2) |
|
for idx, file in enumerate(media_files[:gallery_size * 2]): |
|
with cols[idx % 2]: |
|
st.image(Image.open(file), caption=file, use_container_width=True) |
|
st.markdown(get_download_link(file, "image/png", "Download Snap 📸"), unsafe_allow_html=True) |
|
|
|
def get_available_video_devices(): |
|
"""Camera roll call 🎥 - Who’s ready to shine? Fallback if OpenCV flops!""" |
|
video_devices = [f"Camera {i} 🎥" for i in range(6)] |
|
try: |
|
detected = [] |
|
for i in range(6): |
|
cap = cv2.VideoCapture(i, cv2.CAP_V4L2) |
|
if not cap.isOpened(): |
|
cap = cv2.VideoCapture(i) |
|
if cap.isOpened(): |
|
detected.append(f"Camera {i} 🎥") |
|
logger.info(f"Detected camera at index {i}") |
|
cap.release() |
|
if detected: |
|
video_devices = detected |
|
else: |
|
logger.warning("No cameras detected by OpenCV; using defaults") |
|
except Exception as e: |
|
logger.error(f"Error detecting cameras: {str(e)} - Falling back to defaults") |
|
return video_devices |
|
|
|
st.title("SFT Tiny Titans 🚀 (Small Diffusion Delight!)") |
|
|
|
st.sidebar.header("Media Gallery 🎨") |
|
gallery_size = st.sidebar.slider("Gallery Size 📸", 1, 10, 4, help="How many snaps to flaunt? 🌟") |
|
update_gallery() |
|
|
|
col1, col2 = st.sidebar.columns(2) |
|
with col1: |
|
if st.button("Download All 📦"): |
|
media_files = get_gallery_files(["png"]) |
|
if media_files: |
|
zip_path = f"snapshot_collection_{int(time.time())}.zip" |
|
zip_files(media_files, zip_path) |
|
st.sidebar.markdown(get_download_link(zip_path, "application/zip", "Download All Snaps 📦"), unsafe_allow_html=True) |
|
st.sidebar.success("Snaps zipped! 🎉 Grab your loot!") |
|
else: |
|
st.sidebar.warning("No snaps to zip! 📸 Snap some first!") |
|
with col2: |
|
if st.button("Delete All 🗑️"): |
|
media_files = get_gallery_files(["png"]) |
|
if media_files: |
|
delete_files(media_files) |
|
st.sidebar.success("Snaps vanquished! 🧹 Gallery cleared!") |
|
update_gallery() |
|
else: |
|
st.sidebar.warning("Nothing to delete! 📸 Snap some pics!") |
|
|
|
uploaded_files = st.sidebar.file_uploader("Upload Goodies 🎵🎥🖼️📝📜", type=["mp3", "mp4", "png", "jpeg", "md", "pdf", "docx"], accept_multiple_files=True) |
|
if uploaded_files: |
|
for uploaded_file in uploaded_files: |
|
filename = uploaded_file.name |
|
with open(filename, "wb") as f: |
|
f.write(uploaded_file.getvalue()) |
|
logger.info(f"Uploaded file: {filename}") |
|
|
|
st.sidebar.subheader("Image Gallery 🖼️") |
|
image_files = get_gallery_files(["png", "jpeg"]) |
|
if image_files: |
|
cols = st.sidebar.columns(2) |
|
for idx, file in enumerate(image_files[:gallery_size * 2]): |
|
with cols[idx % 2]: |
|
st.image(Image.open(file), caption=file, use_container_width=True) |
|
st.markdown(get_download_link(file, "image/png" if file.endswith(".png") else "image/jpeg", f"Save Pic 🖼️"), unsafe_allow_html=True) |
|
|
|
st.sidebar.subheader("Model Management 🗂️") |
|
model_dirs = get_model_files() |
|
selected_model = st.sidebar.selectbox("Select Saved Model", ["None"] + model_dirs) |
|
model_type = st.sidebar.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"]) |
|
if selected_model != "None" and st.sidebar.button("Load Model 📂"): |
|
builder = { |
|
"Micro Diffusion": MicroDiffusionBuilder, |
|
"Latent Diffusion": LatentDiffusionBuilder, |
|
"FLUX.1 Distilled": FluxDiffusionBuilder |
|
}[model_type]() |
|
config = DiffusionConfig(name=os.path.basename(selected_model), base_model="unknown", size="small") |
|
try: |
|
builder.load_model(selected_model, config) |
|
st.session_state['cv_builder'] = builder |
|
st.session_state['cv_loaded'] = True |
|
st.rerun() |
|
except Exception as e: |
|
st.error(f"Model load failed: {str(e)} 💥 (Check logs for details!)") |
|
|
|
st.sidebar.subheader("Model Status 🚦") |
|
st.sidebar.write(f"**CV Model**: {'Loaded' if st.session_state['cv_loaded'] else 'Not Loaded'} {'(Active)' if st.session_state['cv_loaded'] and isinstance(st.session_state.get('cv_builder'), (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)) else ''}") |
|
|
|
tabs = ["Build Titan 🌱", "Camera Snap 📷", "Fine-Tune Titan (CV) 🔧", "Test Titan (CV) 🧪", "Agentic RAG Party (CV) 🌐"] |
|
tab1, tab2, tab3, tab4, tab5 = st.tabs(tabs) |
|
|
|
for i, tab in enumerate(tabs): |
|
if st.session_state['active_tab'] != tab and st.session_state.get(f'tab{i}_active', False): |
|
logger.info(f"Switched to tab: {tab}") |
|
st.session_state['active_tab'] = tab |
|
st.session_state[f'tab{i}_active'] = (st.session_state['active_tab'] == tab) |
|
|
|
with tab1: |
|
st.header("Build Titan 🌱") |
|
model_type = st.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"], key="build_type") |
|
base_model = st.selectbox("Select Tiny Model", |
|
["CompVis/ldm-text2im-large-256" if model_type == "Micro Diffusion" else "runwayml/stable-diffusion-v1-5" if model_type == "Latent Diffusion" else "black-forest-labs/flux.1-distilled"]) |
|
model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") |
|
if st.button("Download Model ⬇️"): |
|
config = DiffusionConfig(name=model_name, base_model=base_model, size="small") |
|
builder = { |
|
"Micro Diffusion": MicroDiffusionBuilder, |
|
"Latent Diffusion": LatentDiffusionBuilder, |
|
"FLUX.1 Distilled": FluxDiffusionBuilder |
|
}[model_type]() |
|
try: |
|
builder.load_model(base_model, config) |
|
builder.save_model(config.model_path) |
|
st.session_state['cv_builder'] = builder |
|
st.session_state['cv_loaded'] = True |
|
st.rerun() |
|
except Exception as e: |
|
st.error(f"Model build failed: {str(e)} 💥 (Check logs for details!)") |
|
|
|
with tab2: |
|
st.header("Camera Snap 📷 (Dual Capture Fiesta!)") |
|
video_devices = get_available_video_devices() |
|
st.write(f"🎉 Detected Cameras: {', '.join(video_devices)}") |
|
st.info("Switch cams in your browser settings (e.g., Chrome > Privacy > Camera) since I’m a browser star! 🌟") |
|
|
|
st.subheader("Camera 0 🎬 - Lights, Camera, Action!") |
|
cam0_cols = st.columns(4) |
|
with cam0_cols[0]: |
|
cam0_device = st.selectbox("Cam 📷", video_devices, index=0, key="cam0_device", help="Pick your star cam! 🌟") |
|
with cam0_cols[1]: |
|
cam0_label = st.text_input("Tag 🏷️", "Cam 0 Snap", key="cam0_label", help="Name your masterpiece! 🎨") |
|
with cam0_cols[2]: |
|
cam0_help = st.text_input("Hint 💡", "Snap a heroic moment! 🦸♂️", key="cam0_help", help="Give a fun tip!") |
|
with cam0_cols[3]: |
|
cam0_vis = st.selectbox("Show 🖼️", ["visible", "hidden", "collapsed"], index=0, key="cam0_vis", help="Label vibes: Visible, Sneaky, or Gone!") |
|
|
|
st.subheader("Camera 1 🎥 - Roll the Film!") |
|
cam1_cols = st.columns(4) |
|
with cam1_cols[0]: |
|
cam1_device = st.selectbox("Cam 📷", video_devices, index=1 if len(video_devices) > 1 else 0, key="cam1_device", help="Choose your blockbuster cam! 🎬") |
|
with cam1_cols[1]: |
|
cam1_label = st.text_input("Tag 🏷️", "Cam 1 Snap", key="cam1_label", help="Title your epic shot! 🌠") |
|
with cam1_cols[2]: |
|
cam1_help = st.text_input("Hint 💡", "Grab an epic frame! 🌟", key="cam1_help", help="Drop a cheeky hint!") |
|
with cam1_cols[3]: |
|
cam1_vis = st.selectbox("Show 🖼️", ["visible", "hidden", "collapsed"], index=0, key="cam1_vis", help="Label style: Show it, Hide it, Poof!") |
|
|
|
cols = st.columns(2) |
|
with cols[0]: |
|
st.subheader(f"Camera 0 ({cam0_device}) 🎬") |
|
cam0_img = st.camera_input( |
|
label=cam0_label, |
|
key="cam0", |
|
help=cam0_help, |
|
disabled=False, |
|
label_visibility=cam0_vis |
|
) |
|
if cam0_img: |
|
filename = generate_filename("cam0") |
|
with open(filename, "wb") as f: |
|
f.write(cam0_img.getvalue()) |
|
st.image(Image.open(filename), caption=filename, use_container_width=True) |
|
logger.info(f"Saved snapshot from Camera 0: {filename}") |
|
st.session_state['captured_images'].append(filename) |
|
update_gallery() |
|
st.info("🚨 One snap at a time—your Titan’s too cool for bursts! 😎") |
|
with cols[1]: |
|
st.subheader(f"Camera 1 ({cam1_device}) 🎥") |
|
cam1_img = st.camera_input( |
|
label=cam1_label, |
|
key="cam1", |
|
help=cam1_help, |
|
disabled=False, |
|
label_visibility=cam1_vis |
|
) |
|
if cam1_img: |
|
filename = generate_filename("cam1") |
|
with open(filename, "wb") as f: |
|
f.write(cam1_img.getvalue()) |
|
st.image(Image.open(filename), caption=filename, use_container_width=True) |
|
logger.info(f"Saved snapshot from Camera 1: {filename}") |
|
st.session_state['captured_images'].append(filename) |
|
update_gallery() |
|
st.info("🚨 Single shots only—craft your masterpiece! 🎨") |
|
|
|
with tab3: |
|
st.header("Fine-Tune Titan (CV) 🔧 (Sculpt Your Pixel Prodigy!)") |
|
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): |
|
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no canvas!)") |
|
else: |
|
captured_images = get_gallery_files(["png"]) |
|
if len(captured_images) >= 2: |
|
st.subheader("Use Case 1: Denoise Snapshots 🌟") |
|
denoising_data = [{"image": img, "text": f"Denoised {os.path.basename(img).split('-')[4]} snap"} for img in captured_images[:min(len(captured_images), 10)]] |
|
denoising_edited = st.data_editor(pd.DataFrame(denoising_data), num_rows="dynamic", help="Craft denoising pairs! 🌟") |
|
if st.button("Fine-Tune Denoising 🔄"): |
|
images = [Image.open(row["image"]) for _, row in denoising_edited.iterrows()] |
|
texts = [row["text"] for _, row in denoising_edited.iterrows()] |
|
new_model_name = f"{st.session_state['cv_builder'].config.name}-denoise-{int(time.time())}" |
|
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") |
|
st.session_state['cv_builder'].config = new_config |
|
with st.status("Fine-tuning for denoising... ⏳ (Polishing pixels!)", expanded=True) as status: |
|
st.session_state['cv_builder'].fine_tune_sft(images, texts) |
|
st.session_state['cv_builder'].save_model(new_config.model_path) |
|
status.update(label="Denoising tuned! 🎉 (Pixel shine unleashed!)", state="complete") |
|
zip_path = f"{new_config.model_path}.zip" |
|
zip_files([new_config.model_path], zip_path) |
|
st.markdown(get_download_link(zip_path, "application/zip", "Download Denoised Titan 📦"), unsafe_allow_html=True) |
|
denoising_csv = f"denoise_dataset_{int(time.time())}.csv" |
|
with open(denoising_csv, "w", newline="") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(["image", "text"]) |
|
for _, row in denoising_edited.iterrows(): |
|
writer.writerow([row["image"], row["text"]]) |
|
st.markdown(get_download_link(denoising_csv, "text/csv", "Download Denoising CSV 📜"), unsafe_allow_html=True) |
|
|
|
st.subheader("Use Case 2: Stylize Snapshots 🎨") |
|
stylize_data = [{"image": img, "text": f"Neon {os.path.basename(img).split('-')[4]} style"} for img in captured_images[:min(len(captured_images), 10)]] |
|
stylize_edited = st.data_editor(pd.DataFrame(stylize_data), num_rows="dynamic", help="Craft stylized pairs! 🎨") |
|
if st.button("Fine-Tune Stylization 🔄"): |
|
images = [Image.open(row["image"]) for _, row in stylize_edited.iterrows()] |
|
texts = [row["text"] for _, row in stylize_edited.iterrows()] |
|
new_model_name = f"{st.session_state['cv_builder'].config.name}-stylize-{int(time.time())}" |
|
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") |
|
st.session_state['cv_builder'].config = new_config |
|
with st.status("Fine-tuning for stylization... ⏳ (Painting pixels!)", expanded=True) as status: |
|
st.session_state['cv_builder'].fine_tune_sft(images, texts) |
|
st.session_state['cv_builder'].save_model(new_config.model_path) |
|
status.update(label="Stylization tuned! 🎉 (Pixel art unleashed!)", state="complete") |
|
zip_path = f"{new_config.model_path}.zip" |
|
zip_files([new_config.model_path], zip_path) |
|
st.markdown(get_download_link(zip_path, "application/zip", "Download Stylized Titan 📦"), unsafe_allow_html=True) |
|
stylize_md = f"stylize_dataset_{int(time.time())}.md" |
|
with open(stylize_md, "w") as f: |
|
f.write("# Stylization Dataset\n\n") |
|
for _, row in stylize_edited.iterrows(): |
|
f.write(f"- `{row['image']}`: {row['text']}\n") |
|
st.markdown(get_download_link(stylize_md, "text/markdown", "Download Stylization MD 📝"), unsafe_allow_html=True) |
|
|
|
st.subheader("Use Case 3: Multi-Angle Snapshots 🌐") |
|
multiangle_data = [{"image": img, "text": f"View from {os.path.basename(img).split('-')[4]}"} for img in captured_images[:min(len(captured_images), 10)]] |
|
multiangle_edited = st.data_editor(pd.DataFrame(multiangle_data), num_rows="dynamic", help="Craft multi-angle pairs! 🌐") |
|
if st.button("Fine-Tune Multi-Angle 🔄"): |
|
images = [Image.open(row["image"]) for _, row in multiangle_edited.iterrows()] |
|
texts = [row["text"] for _, row in multiangle_edited.iterrows()] |
|
new_model_name = f"{st.session_state['cv_builder'].config.name}-multiangle-{int(time.time())}" |
|
new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") |
|
st.session_state['cv_builder'].config = new_config |
|
with st.status("Fine-tuning for multi-angle... ⏳ (Spinning pixels!)", expanded=True) as status: |
|
st.session_state['cv_builder'].fine_tune_sft(images, texts) |
|
st.session_state['cv_builder'].save_model(new_config.model_path) |
|
status.update(label="Multi-angle tuned! 🎉 (Pixel views unleashed!)", state="complete") |
|
zip_path = f"{new_config.model_path}.zip" |
|
zip_files([new_config.model_path], zip_path) |
|
st.markdown(get_download_link(zip_path, "application/zip", "Download Multi-Angle Titan 📦"), unsafe_allow_html=True) |
|
multiangle_csv = f"multiangle_dataset_{int(time.time())}.csv" |
|
with open(multiangle_csv, "w", newline="") as f: |
|
writer = csv.writer(f) |
|
writer.writerow(["image", "text"]) |
|
for _, row in multiangle_edited.iterrows(): |
|
writer.writerow([row["image"], row["text"]]) |
|
st.markdown(get_download_link(multiangle_csv, "text/csv", "Download Multi-Angle CSV 📜"), unsafe_allow_html=True) |
|
|
|
with tab4: |
|
st.header("Test Titan (CV) 🧪 (Unleash Your Pixel Power!)") |
|
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): |
|
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no masterpiece!)") |
|
else: |
|
st.subheader("Test Your Titan 🎨") |
|
test_prompt = st.text_area("Prompt 🎤", "Neon glow from cam0", help="Dream up a wild image—your Titan’s ready to paint! 🖌️") |
|
if st.button("Generate ▶️"): |
|
with st.spinner("Crafting your masterpiece... ⏳ (Titan’s mixing pixels!)"): |
|
image = st.session_state['cv_builder'].generate(test_prompt) |
|
st.image(image, caption=f"Generated: {test_prompt}", use_container_width=True) |
|
|
|
with tab5: |
|
st.header("Agentic RAG Party (CV) 🌐 (Pixel Party Extravaganza!)") |
|
st.write("Generate superhero party vibes from your tuned Titan! 🎉") |
|
if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): |
|
st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no party!)") |
|
else: |
|
if st.button("Run RAG Demo 🎉"): |
|
with st.spinner("Loading your pixel party titan... ⏳ (Titan’s grabbing its brush!)"): |
|
class CVPartyAgent: |
|
def __init__(self, pipeline): |
|
self.pipeline = pipeline |
|
def generate(self, prompt: str) -> Image.Image: |
|
return self.pipeline(prompt, num_inference_steps=50).images[0] |
|
def plan_party(self): |
|
prompts = [ |
|
"Gold-plated Batman statue from cam0", |
|
"VR superhero battle scene from cam1", |
|
"Neon-lit Avengers tower from cam2" |
|
] |
|
data = [{"Theme": f"Scene {i+1}", "Image Idea": prompt} for i, prompt in enumerate(prompts)] |
|
return pd.DataFrame(data) |
|
agent = CVPartyAgent(st.session_state['cv_builder'].pipeline) |
|
st.write("Party agent ready! 🎨 (Time to paint an epic bash!)") |
|
with st.spinner("Crafting superhero party visuals... ⏳ (Pixels assemble!)"): |
|
try: |
|
plan_df = agent.plan_party() |
|
st.dataframe(plan_df) |
|
for _, row in plan_df.iterrows(): |
|
image = agent.generate(row["Image Idea"]) |
|
st.image(image, caption=f"{row['Theme']} - {row['Image Idea']}", use_container_width=True) |
|
except Exception as e: |
|
st.error(f"Party crashed: {str(e)} 💥 (Pixel oopsie!)") |
|
logger.error(f"RAG demo failed: {str(e)}") |
|
|
|
st.sidebar.subheader("Action Logs 📜") |
|
log_container = st.sidebar.empty() |
|
with log_container: |
|
for record in log_records: |
|
st.write(f"{record.asctime} - {record.levelname} - {record.message}") |
|
|
|
update_gallery() |