#!/usr/bin/env python3 import os import glob import base64 import streamlit as st import pandas as pd import torch from torch.utils.data import Dataset, DataLoader import csv import time from dataclasses import dataclass from typing import Optional import zipfile import math from PIL import Image import random import logging import numpy as np import cv2 from diffusers import DiffusionPipeline logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) st.set_page_config( page_title="SFT Tiny Titans πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded", menu_items={ 'Get Help': 'https://huggingface.co/awacke1', 'Report a Bug': 'https://huggingface.co/spaces/awacke1', 'About': "Tiny Titans: Small diffusion models, big CV dreams! 🌌" } ) if 'captured_images' not in st.session_state: st.session_state['captured_images'] = [] if 'cv_builder' not in st.session_state: st.session_state['cv_builder'] = None if 'cv_loaded' not in st.session_state: st.session_state['cv_loaded'] = False if 'active_tab' not in st.session_state: st.session_state['active_tab'] = "Build Titan 🌱" @dataclass class DiffusionConfig: """Config for our diffusion heroes πŸ¦Έβ€β™‚οΈ - Keeps the blueprint snappy!""" name: str base_model: str size: str @property def model_path(self): return f"diffusion_models/{self.name}" class DiffusionDataset(Dataset): """Pixel party platter πŸ• - Images and text for diffusion delight!""" def __init__(self, images, texts): self.images = images self.texts = texts def __len__(self): return len(self.images) def __getitem__(self, idx): return {"image": self.images[idx], "text": self.texts[idx]} class MicroDiffusionBuilder: """Tiny titan of diffusion 🐣 - Small but mighty for quick demos!""" def __init__(self): self.config = None self.pipeline = None self.jokes = ["Micro but mighty! πŸ’ͺ", "Small pixels, big dreams! 🌟"] def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): try: with st.spinner(f"Loading {model_path}... ⏳ (Tiny titan powering up!)"): self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") if config: self.config = config st.success(f"Model loaded! πŸŽ‰ {random.choice(self.jokes)}") logger.info(f"Loaded Micro Diffusion: {model_path}") except Exception as e: st.error(f"Failed to load {model_path}: {str(e)} πŸ’₯ (Tiny titan tripped!)") logger.error(f"Failed to load {model_path}: {str(e)}") raise return self def fine_tune_sft(self, images, texts, epochs=3): try: dataset = DiffusionDataset(images, texts) dataloader = DataLoader(dataset, batch_size=1, shuffle=True) optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) self.pipeline.unet.train() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") for epoch in range(epochs): with st.spinner(f"Epoch {epoch + 1}/{epochs}... βš™οΈ (Micro titan flexing!)"): total_loss = 0 for batch in dataloader: optimizer.zero_grad() image = batch["image"][0].to(device) text = batch["text"][0] latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() total_loss += loss.item() st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") st.success(f"Micro Diffusion tuned! πŸŽ‰ {random.choice(self.jokes)}") logger.info(f"Fine-tuned Micro Diffusion: {self.config.name}") except Exception as e: st.error(f"Tuning failed: {str(e)} πŸ’₯ (Micro snag!)") logger.error(f"Tuning failed: {str(e)}") raise return self def save_model(self, path: str): try: with st.spinner("Saving model... πŸ’Ύ (Packing tiny pixels!)"): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) st.success(f"Saved at {path}! βœ… Tiny titan secured!") logger.info(f"Saved at {path}") except Exception as e: st.error(f"Save failed: {str(e)} πŸ’₯ (Packing mishap!)") logger.error(f"Save failed: {str(e)}") raise def generate(self, prompt: str): try: return self.pipeline(prompt, num_inference_steps=20).images[0] except Exception as e: st.error(f"Generation failed: {str(e)} πŸ’₯ (Pixel oopsie!)") logger.error(f"Generation failed: {str(e)}") raise class LatentDiffusionBuilder: """Scaled-down dreamer πŸŒ™ - Latent magic for efficient artistry!""" def __init__(self): self.config = None self.pipeline = None self.jokes = ["Latent vibes only! πŸŒ€", "Small scale, big style! 🎨"] def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): try: with st.spinner(f"Loading {model_path}... ⏳ (Latent titan rising!)"): self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) self.pipeline.unet = torch.nn.Sequential(*list(self.pipeline.unet.children())[:2]) # Scale down U-Net self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") if config: self.config = config st.success(f"Model loaded! πŸŽ‰ {random.choice(self.jokes)}") logger.info(f"Loaded Latent Diffusion: {model_path}") except Exception as e: st.error(f"Failed to load {model_path}: {str(e)} πŸ’₯ (Latent hiccup!)") logger.error(f"Failed to load {model_path}: {str(e)}") raise return self def fine_tune_sft(self, images, texts, epochs=3): try: dataset = DiffusionDataset(images, texts) dataloader = DataLoader(dataset, batch_size=1, shuffle=True) optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) self.pipeline.unet.train() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") for epoch in range(epochs): with st.spinner(f"Epoch {epoch + 1}/{epochs}... βš™οΈ (Latent titan shaping up!)"): total_loss = 0 for batch in dataloader: optimizer.zero_grad() image = batch["image"][0].to(device) text = batch["text"][0] latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() total_loss += loss.item() st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") st.success(f"Latent Diffusion tuned! πŸŽ‰ {random.choice(self.jokes)}") logger.info(f"Fine-tuned Latent Diffusion: {self.config.name}") except Exception as e: st.error(f"Tuning failed: {str(e)} πŸ’₯ (Latent snag!)") logger.error(f"Tuning failed: {str(e)}") raise return self def save_model(self, path: str): try: with st.spinner("Saving model... πŸ’Ύ (Packing latent dreams!)"): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) st.success(f"Saved at {path}! βœ… Latent titan stashed!") logger.info(f"Saved at {path}") except Exception as e: st.error(f"Save failed: {str(e)} πŸ’₯ (Dreamy mishap!)") logger.error(f"Save failed: {str(e)}") raise def generate(self, prompt: str): try: return self.pipeline(prompt, num_inference_steps=30).images[0] except Exception as e: st.error(f"Generation failed: {str(e)} πŸ’₯ (Latent oopsie!)") logger.error(f"Generation failed: {str(e)}") raise class FluxDiffusionBuilder: """Distilled dynamo ⚑ - High-quality pixels in a small package!""" def __init__(self): self.config = None self.pipeline = None self.jokes = ["Flux-tastic! ✨", "Small size, big wow! πŸŽ‡"] def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): try: with st.spinner(f"Loading {model_path}... ⏳ (Flux titan charging!)"): self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True) self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") if config: self.config = config st.success(f"Model loaded! πŸŽ‰ {random.choice(self.jokes)}") logger.info(f"Loaded FLUX.1 Distilled: {model_path}") except Exception as e: st.error(f"Failed to load {model_path}: {str(e)} πŸ’₯ (Flux fizzle!)") logger.error(f"Failed to load {model_path}: {str(e)}") raise return self def fine_tune_sft(self, images, texts, epochs=3): try: dataset = DiffusionDataset(images, texts) dataloader = DataLoader(dataset, batch_size=1, shuffle=True) optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) self.pipeline.unet.train() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") for epoch in range(epochs): with st.spinner(f"Epoch {epoch + 1}/{epochs}... βš™οΈ (Flux titan powering up!)"): total_loss = 0 for batch in dataloader: optimizer.zero_grad() image = batch["image"][0].to(device) text = batch["text"][0] latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0] pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() total_loss += loss.item() st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}") st.success(f"FLUX Diffusion tuned! πŸŽ‰ {random.choice(self.jokes)}") logger.info(f"Fine-tuned FLUX.1 Distilled: {self.config.name}") except Exception as e: st.error(f"Tuning failed: {str(e)} πŸ’₯ (Flux snag!)") logger.error(f"Tuning failed: {str(e)}") raise return self def save_model(self, path: str): try: with st.spinner("Saving model... πŸ’Ύ (Packing flux magic!)"): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) st.success(f"Saved at {path}! βœ… Flux titan secured!") logger.info(f"Saved at {path}") except Exception as e: st.error(f"Save failed: {str(e)} πŸ’₯ (Fluxy mishap!)") logger.error(f"Save failed: {str(e)}") raise def generate(self, prompt: str): try: return self.pipeline(prompt, num_inference_steps=50).images[0] except Exception as e: st.error(f"Generation failed: {str(e)} πŸ’₯ (Flux oopsie!)") logger.error(f"Generation failed: {str(e)}") raise def generate_filename(sequence, ext="png"): """Time-stamped snapshots ⏰ - Keeps our pics organized with cam flair!""" from datetime import datetime import pytz central = pytz.timezone('US/Central') dt = datetime.now(central) return f"{dt.strftime('%m-%d-%Y-%I-%M-%S-%p')}-{sequence}.{ext}" def get_download_link(file_path, mime_type="text/plain", label="Download"): """Magic link maker πŸ”— - Snag your files with a click!""" try: with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{label} πŸ“₯' except Exception as e: logger.error(f"Failed to generate link for {file_path}: {str(e)}") return f"Error: Could not generate link for {file_path}" def zip_files(files, zip_path): """Zip zap zoo πŸŽ’ - Bundle up your goodies!""" try: with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for file in files: zipf.write(file, os.path.basename(file)) logger.info(f"Created ZIP file: {zip_path}") except Exception as e: logger.error(f"Failed to create ZIP {zip_path}: {str(e)}") raise def delete_files(files): """Trash titan πŸ—‘οΈ - Clear the stage for new stars!""" try: for file in files: os.remove(file) logger.info(f"Deleted file: {file}") st.session_state['captured_images'] = [f for f in st.session_state['captured_images'] if f not in files] except Exception as e: logger.error(f"Failed to delete files: {str(e)}") raise def get_model_files(): """Model treasure hunt πŸ—ΊοΈ - Find our diffusion gems!""" return [d for d in glob.glob("diffusion_models/*") if os.path.isdir(d)] def get_gallery_files(file_types): """Gallery curator πŸ–ΌοΈ - Showcase our pixel masterpieces!""" return sorted(list(set(f for ext in file_types for f in glob.glob(f"*.{ext}")))) def update_gallery(): """Gallery refresh 🌟 - Keep the art flowing!""" media_files = get_gallery_files(["png"]) if media_files: cols = st.sidebar.columns(2) for idx, file in enumerate(media_files[:gallery_size * 2]): with cols[idx % 2]: st.image(Image.open(file), caption=file, use_container_width=True) st.markdown(get_download_link(file, "image/png", "Download Snap πŸ“Έ"), unsafe_allow_html=True) def get_available_video_devices(): """Camera roll call πŸŽ₯ - Who’s ready to shine? Fallback if OpenCV flops!""" video_devices = [f"Camera {i} πŸŽ₯" for i in range(6)] # Default to 6 cams try: detected = [] for i in range(6): # Limit to 6 as per your setup cap = cv2.VideoCapture(i, cv2.CAP_V4L2) if not cap.isOpened(): cap = cv2.VideoCapture(i) if cap.isOpened(): detected.append(f"Camera {i} πŸŽ₯") logger.info(f"Detected camera at index {i}") cap.release() if detected: video_devices = detected else: logger.warning("No cameras detected by OpenCV; using defaults") except Exception as e: logger.error(f"Error detecting cameras: {str(e)} - Falling back to defaults") return video_devices st.title("SFT Tiny Titans πŸš€ (Small Diffusion Delight!)") st.sidebar.header("Media Gallery 🎨") gallery_size = st.sidebar.slider("Gallery Size πŸ“Έ", 1, 10, 4, help="How many snaps to flaunt? 🌟") update_gallery() col1, col2 = st.sidebar.columns(2) with col1: if st.button("Download All πŸ“¦"): media_files = get_gallery_files(["png"]) if media_files: zip_path = f"snapshot_collection_{int(time.time())}.zip" zip_files(media_files, zip_path) st.sidebar.markdown(get_download_link(zip_path, "application/zip", "Download All Snaps πŸ“¦"), unsafe_allow_html=True) st.sidebar.success("Snaps zipped! πŸŽ‰ Grab your loot!") else: st.sidebar.warning("No snaps to zip! πŸ“Έ Snap some first!") with col2: if st.button("Delete All πŸ—‘οΈ"): media_files = get_gallery_files(["png"]) if media_files: delete_files(media_files) st.sidebar.success("Snaps vanquished! 🧹 Gallery cleared!") update_gallery() else: st.sidebar.warning("Nothing to delete! πŸ“Έ Snap some pics!") uploaded_files = st.sidebar.file_uploader("Upload Goodies 🎡πŸŽ₯πŸ–ΌοΈπŸ“πŸ“œ", type=["mp3", "mp4", "png", "jpeg", "md", "pdf", "docx"], accept_multiple_files=True) if uploaded_files: for uploaded_file in uploaded_files: filename = uploaded_file.name with open(filename, "wb") as f: f.write(uploaded_file.getvalue()) logger.info(f"Uploaded file: {filename}") st.sidebar.subheader("Image Gallery πŸ–ΌοΈ") image_files = get_gallery_files(["png", "jpeg"]) if image_files: cols = st.sidebar.columns(2) for idx, file in enumerate(image_files[:gallery_size * 2]): with cols[idx % 2]: st.image(Image.open(file), caption=file, use_container_width=True) st.markdown(get_download_link(file, "image/png" if file.endswith(".png") else "image/jpeg", f"Save Pic πŸ–ΌοΈ"), unsafe_allow_html=True) st.sidebar.subheader("Model Management πŸ—‚οΈ") model_dirs = get_model_files() selected_model = st.sidebar.selectbox("Select Saved Model", ["None"] + model_dirs) model_type = st.sidebar.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"]) if selected_model != "None" and st.sidebar.button("Load Model πŸ“‚"): builder = { "Micro Diffusion": MicroDiffusionBuilder, "Latent Diffusion": LatentDiffusionBuilder, "FLUX.1 Distilled": FluxDiffusionBuilder }[model_type]() config = DiffusionConfig(name=os.path.basename(selected_model), base_model="unknown", size="small") try: builder.load_model(selected_model, config) st.session_state['cv_builder'] = builder st.session_state['cv_loaded'] = True st.rerun() except Exception as e: st.error(f"Model load failed: {str(e)} πŸ’₯ (Check logs for details!)") st.sidebar.subheader("Model Status 🚦") st.sidebar.write(f"**CV Model**: {'Loaded' if st.session_state['cv_loaded'] else 'Not Loaded'} {'(Active)' if st.session_state['cv_loaded'] and isinstance(st.session_state.get('cv_builder'), (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)) else ''}") tabs = ["Build Titan 🌱", "Camera Snap πŸ“·", "Fine-Tune Titan (CV) πŸ”§", "Test Titan (CV) πŸ§ͺ", "Agentic RAG Party (CV) 🌐"] tab1, tab2, tab3, tab4, tab5 = st.tabs(tabs) for i, tab in enumerate(tabs): if st.session_state['active_tab'] != tab and st.session_state.get(f'tab{i}_active', False): logger.info(f"Switched to tab: {tab}") st.session_state['active_tab'] = tab st.session_state[f'tab{i}_active'] = (st.session_state['active_tab'] == tab) with tab1: st.header("Build Titan 🌱") model_type = st.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"], key="build_type") base_model = st.selectbox("Select Tiny Model", ["CompVis/ldm-text2im-large-256" if model_type == "Micro Diffusion" else "runwayml/stable-diffusion-v1-5" if model_type == "Latent Diffusion" else "black-forest-labs/flux.1-distilled"]) model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") if st.button("Download Model ⬇️"): config = DiffusionConfig(name=model_name, base_model=base_model, size="small") builder = { "Micro Diffusion": MicroDiffusionBuilder, "Latent Diffusion": LatentDiffusionBuilder, "FLUX.1 Distilled": FluxDiffusionBuilder }[model_type]() try: builder.load_model(base_model, config) builder.save_model(config.model_path) st.session_state['cv_builder'] = builder st.session_state['cv_loaded'] = True st.rerun() except Exception as e: st.error(f"Model build failed: {str(e)} πŸ’₯ (Check logs for details!)") with tab2: st.header("Camera Snap πŸ“· (Dual Capture Fiesta!)") video_devices = get_available_video_devices() st.write(f"πŸŽ‰ Detected Cameras: {', '.join(video_devices)}") st.info("Switch cams in your browser settings (e.g., Chrome > Privacy > Camera) since I’m a browser star! 🌟") st.subheader("Camera 0 🎬 - Lights, Camera, Action!") cam0_cols = st.columns(4) with cam0_cols[0]: cam0_device = st.selectbox("Cam πŸ“·", video_devices, index=0, key="cam0_device", help="Pick your star cam! 🌟") with cam0_cols[1]: cam0_label = st.text_input("Tag 🏷️", "Cam 0 Snap", key="cam0_label", help="Name your masterpiece! 🎨") with cam0_cols[2]: cam0_help = st.text_input("Hint πŸ’‘", "Snap a heroic moment! πŸ¦Έβ€β™‚οΈ", key="cam0_help", help="Give a fun tip!") with cam0_cols[3]: cam0_vis = st.selectbox("Show πŸ–ΌοΈ", ["visible", "hidden", "collapsed"], index=0, key="cam0_vis", help="Label vibes: Visible, Sneaky, or Gone!") st.subheader("Camera 1 πŸŽ₯ - Roll the Film!") cam1_cols = st.columns(4) with cam1_cols[0]: cam1_device = st.selectbox("Cam πŸ“·", video_devices, index=1 if len(video_devices) > 1 else 0, key="cam1_device", help="Choose your blockbuster cam! 🎬") with cam1_cols[1]: cam1_label = st.text_input("Tag 🏷️", "Cam 1 Snap", key="cam1_label", help="Title your epic shot! 🌠") with cam1_cols[2]: cam1_help = st.text_input("Hint πŸ’‘", "Grab an epic frame! 🌟", key="cam1_help", help="Drop a cheeky hint!") with cam1_cols[3]: cam1_vis = st.selectbox("Show πŸ–ΌοΈ", ["visible", "hidden", "collapsed"], index=0, key="cam1_vis", help="Label style: Show it, Hide it, Poof!") cols = st.columns(2) with cols[0]: st.subheader(f"Camera 0 ({cam0_device}) 🎬") cam0_img = st.camera_input( label=cam0_label, key="cam0", help=cam0_help, disabled=False, label_visibility=cam0_vis ) if cam0_img: filename = generate_filename("cam0") with open(filename, "wb") as f: f.write(cam0_img.getvalue()) st.image(Image.open(filename), caption=filename, use_container_width=True) logger.info(f"Saved snapshot from Camera 0: {filename}") st.session_state['captured_images'].append(filename) update_gallery() st.info("🚨 One snap at a timeβ€”your Titan’s too cool for bursts! 😎") with cols[1]: st.subheader(f"Camera 1 ({cam1_device}) πŸŽ₯") cam1_img = st.camera_input( label=cam1_label, key="cam1", help=cam1_help, disabled=False, label_visibility=cam1_vis ) if cam1_img: filename = generate_filename("cam1") with open(filename, "wb") as f: f.write(cam1_img.getvalue()) st.image(Image.open(filename), caption=filename, use_container_width=True) logger.info(f"Saved snapshot from Camera 1: {filename}") st.session_state['captured_images'].append(filename) update_gallery() st.info("🚨 Single shots onlyβ€”craft your masterpiece! 🎨") with tab3: st.header("Fine-Tune Titan (CV) πŸ”§ (Sculpt Your Pixel Prodigy!)") if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no canvas!)") else: captured_images = get_gallery_files(["png"]) if len(captured_images) >= 2: st.subheader("Use Case 1: Denoise Snapshots 🌟") denoising_data = [{"image": img, "text": f"Denoised {os.path.basename(img).split('-')[4]} snap"} for img in captured_images[:min(len(captured_images), 10)]] denoising_edited = st.data_editor(pd.DataFrame(denoising_data), num_rows="dynamic", help="Craft denoising pairs! 🌟") if st.button("Fine-Tune Denoising πŸ”„"): images = [Image.open(row["image"]) for _, row in denoising_edited.iterrows()] texts = [row["text"] for _, row in denoising_edited.iterrows()] new_model_name = f"{st.session_state['cv_builder'].config.name}-denoise-{int(time.time())}" new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") st.session_state['cv_builder'].config = new_config with st.status("Fine-tuning for denoising... ⏳ (Polishing pixels!)", expanded=True) as status: st.session_state['cv_builder'].fine_tune_sft(images, texts) st.session_state['cv_builder'].save_model(new_config.model_path) status.update(label="Denoising tuned! πŸŽ‰ (Pixel shine unleashed!)", state="complete") zip_path = f"{new_config.model_path}.zip" zip_files([new_config.model_path], zip_path) st.markdown(get_download_link(zip_path, "application/zip", "Download Denoised Titan πŸ“¦"), unsafe_allow_html=True) denoising_csv = f"denoise_dataset_{int(time.time())}.csv" with open(denoising_csv, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["image", "text"]) for _, row in denoising_edited.iterrows(): writer.writerow([row["image"], row["text"]]) st.markdown(get_download_link(denoising_csv, "text/csv", "Download Denoising CSV πŸ“œ"), unsafe_allow_html=True) st.subheader("Use Case 2: Stylize Snapshots 🎨") stylize_data = [{"image": img, "text": f"Neon {os.path.basename(img).split('-')[4]} style"} for img in captured_images[:min(len(captured_images), 10)]] stylize_edited = st.data_editor(pd.DataFrame(stylize_data), num_rows="dynamic", help="Craft stylized pairs! 🎨") if st.button("Fine-Tune Stylization πŸ”„"): images = [Image.open(row["image"]) for _, row in stylize_edited.iterrows()] texts = [row["text"] for _, row in stylize_edited.iterrows()] new_model_name = f"{st.session_state['cv_builder'].config.name}-stylize-{int(time.time())}" new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") st.session_state['cv_builder'].config = new_config with st.status("Fine-tuning for stylization... ⏳ (Painting pixels!)", expanded=True) as status: st.session_state['cv_builder'].fine_tune_sft(images, texts) st.session_state['cv_builder'].save_model(new_config.model_path) status.update(label="Stylization tuned! πŸŽ‰ (Pixel art unleashed!)", state="complete") zip_path = f"{new_config.model_path}.zip" zip_files([new_config.model_path], zip_path) st.markdown(get_download_link(zip_path, "application/zip", "Download Stylized Titan πŸ“¦"), unsafe_allow_html=True) stylize_md = f"stylize_dataset_{int(time.time())}.md" with open(stylize_md, "w") as f: f.write("# Stylization Dataset\n\n") for _, row in stylize_edited.iterrows(): f.write(f"- `{row['image']}`: {row['text']}\n") st.markdown(get_download_link(stylize_md, "text/markdown", "Download Stylization MD πŸ“"), unsafe_allow_html=True) st.subheader("Use Case 3: Multi-Angle Snapshots 🌐") multiangle_data = [{"image": img, "text": f"View from {os.path.basename(img).split('-')[4]}"} for img in captured_images[:min(len(captured_images), 10)]] multiangle_edited = st.data_editor(pd.DataFrame(multiangle_data), num_rows="dynamic", help="Craft multi-angle pairs! 🌐") if st.button("Fine-Tune Multi-Angle πŸ”„"): images = [Image.open(row["image"]) for _, row in multiangle_edited.iterrows()] texts = [row["text"] for _, row in multiangle_edited.iterrows()] new_model_name = f"{st.session_state['cv_builder'].config.name}-multiangle-{int(time.time())}" new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small") st.session_state['cv_builder'].config = new_config with st.status("Fine-tuning for multi-angle... ⏳ (Spinning pixels!)", expanded=True) as status: st.session_state['cv_builder'].fine_tune_sft(images, texts) st.session_state['cv_builder'].save_model(new_config.model_path) status.update(label="Multi-angle tuned! πŸŽ‰ (Pixel views unleashed!)", state="complete") zip_path = f"{new_config.model_path}.zip" zip_files([new_config.model_path], zip_path) st.markdown(get_download_link(zip_path, "application/zip", "Download Multi-Angle Titan πŸ“¦"), unsafe_allow_html=True) multiangle_csv = f"multiangle_dataset_{int(time.time())}.csv" with open(multiangle_csv, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["image", "text"]) for _, row in multiangle_edited.iterrows(): writer.writerow([row["image"], row["text"]]) st.markdown(get_download_link(multiangle_csv, "text/csv", "Download Multi-Angle CSV πŸ“œ"), unsafe_allow_html=True) with tab4: st.header("Test Titan (CV) πŸ§ͺ (Unleash Your Pixel Power!)") if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no masterpiece!)") else: st.subheader("Test Your Titan 🎨") test_prompt = st.text_area("Prompt 🎀", "Neon glow from cam0", help="Dream up a wild imageβ€”your Titan’s ready to paint! πŸ–ŒοΈ") if st.button("Generate ▢️"): with st.spinner("Crafting your masterpiece... ⏳ (Titan’s mixing pixels!)"): image = st.session_state['cv_builder'].generate(test_prompt) st.image(image, caption=f"Generated: {test_prompt}", use_container_width=True) with tab5: st.header("Agentic RAG Party (CV) 🌐 (Pixel Party Extravaganza!)") st.write("Generate superhero party vibes from your tuned Titan! πŸŽ‰") if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)): st.warning("Please build or load a CV Titan first! ⚠️ (No artist, no party!)") else: if st.button("Run RAG Demo πŸŽ‰"): with st.spinner("Loading your pixel party titan... ⏳ (Titan’s grabbing its brush!)"): class CVPartyAgent: def __init__(self, pipeline): self.pipeline = pipeline def generate(self, prompt: str) -> Image.Image: return self.pipeline(prompt, num_inference_steps=50).images[0] def plan_party(self): prompts = [ "Gold-plated Batman statue from cam0", "VR superhero battle scene from cam1", "Neon-lit Avengers tower from cam2" ] data = [{"Theme": f"Scene {i+1}", "Image Idea": prompt} for i, prompt in enumerate(prompts)] return pd.DataFrame(data) agent = CVPartyAgent(st.session_state['cv_builder'].pipeline) st.write("Party agent ready! 🎨 (Time to paint an epic bash!)") with st.spinner("Crafting superhero party visuals... ⏳ (Pixels assemble!)"): try: plan_df = agent.plan_party() st.dataframe(plan_df) for _, row in plan_df.iterrows(): image = agent.generate(row["Image Idea"]) st.image(image, caption=f"{row['Theme']} - {row['Image Idea']}", use_container_width=True) except Exception as e: st.error(f"Party crashed: {str(e)} πŸ’₯ (Pixel oopsie!)") logger.error(f"RAG demo failed: {str(e)}") st.sidebar.subheader("Action Logs πŸ“œ") log_container = st.sidebar.empty() with log_container: for record in log_records: st.write(f"{record.asctime} - {record.levelname} - {record.message}") update_gallery()