#!/usr/bin/env python3 import os import base64 import streamlit as st import csv import time from dataclasses import dataclass st.set_page_config(page_title="SFT Tiny Titans πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded") # Model Configurations @dataclass class ModelConfig: name: str base_model: str model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str @property def model_path(self): return f"diffusion_models/{self.name}" # Lazy-loaded Builders class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None def load_model(self, model_path: str, config: ModelConfig): from transformers import AutoModelForCausalLM, AutoTokenizer import torch self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.config = config self.model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) def evaluate(self, prompt: str): import torch self.model.eval() with torch.no_grad(): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device) outputs = self.model.generate(**inputs, max_new_tokens=50) return self.tokenizer.decode(outputs[0], skip_special_tokens=True) class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: DiffusionConfig): from diffusers import StableDiffusionPipeline import torch self.pipeline = StableDiffusionPipeline.from_pretrained(model_path) self.pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) self.config = config def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] # Utilities def get_download_link(file_path, mime_type="text/plain", label="Download"): with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{label} πŸ“₯' def generate_filename(text_line): from datetime import datetime import pytz central = pytz.timezone('US/Central') timestamp = datetime.now(central).strftime("%Y%m%d_%I%M%S_%p") safe_text = ''.join(c if c.isalnum() else '_' for c in text_line[:50]) return f"{timestamp}_{safe_text}.png" def get_gallery_files(file_types): import glob return sorted([f for ext in file_types for f in glob.glob(f"*.{ext}")]) # Video Processor for WebRTC class VideoSnapshot: def __init__(self): self.snapshot = None def recv(self, frame): from PIL import Image img = frame.to_image() self.snapshot = img return frame def take_snapshot(self): return self.snapshot # Main App st.title("SFT Tiny Titans πŸš€ (Fast & Fixed!)") # Sidebar Galleries st.sidebar.header("Media Gallery 🎨") for gallery_type, file_types, emoji in [("Images πŸ“Έ", ["png", "jpg", "jpeg"], "πŸ–ΌοΈ"), ("Videos πŸŽ₯", ["mp4"], "🎬")]: st.sidebar.subheader(f"{gallery_type} {emoji}") files = get_gallery_files(file_types) if files: cols = st.sidebar.columns(2) for idx, file in enumerate(files[:4]): with cols[idx % 2]: if "Images" in gallery_type: from PIL import Image st.image(Image.open(file), caption=file.split('/')[-1], use_container_width=True) elif "Videos" in gallery_type: st.video(file) # Sidebar Model Management st.sidebar.subheader("Model Hub πŸ—‚οΈ") model_type = st.sidebar.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"]) model_options = {"NLP (Causal LM)": "HuggingFaceTB/SmolLM-135M", "CV (Diffusion)": "CompVis/stable-diffusion-v1-4"} selected_model = st.sidebar.selectbox("Select Model", ["None", model_options[model_type]]) if selected_model != "None" and st.sidebar.button("Load Model πŸ“‚"): builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=selected_model) with st.spinner("Loading... ⏳"): builder.load_model(selected_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True # Tabs tab1, tab2, tab3, tab4 = st.tabs(["Build Titan 🌱", "Fine-Tune Titans πŸ”§", "Test Titans πŸ§ͺ", "Camera Snap πŸ“·"]) with tab1: st.header("Build Titan 🌱 (Quick Start!)") model_type = st.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"], key="build_type") base_model = st.selectbox("Select Model", [model_options[model_type]], key="build_model") if st.button("Download Model ⬇️"): config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=base_model) builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() with st.spinner("Fetching... ⏳"): builder.load_model(base_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True st.success("Titan up! πŸŽ‰") with tab2: st.header("Fine-Tune Titans πŸ”§ (Tune Fast!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! ⚠️") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Tune 🧠") uploaded_csv = st.file_uploader("Upload CSV", type="csv", key="nlp_csv") if uploaded_csv and st.button("Tune NLP πŸ”„"): from torch.utils.data import Dataset, DataLoader import torch class SFTDataset(Dataset): def __init__(self, data, tokenizer): self.data = data self.tokenizer = tokenizer def __len__(self): return len(self.data) def __getitem__(self, idx): prompt = self.data[idx]["prompt"] response = self.data[idx]["response"] inputs = self.tokenizer(f"{prompt} {response}", return_tensors="pt", padding="max_length", max_length=128, truncation=True) labels = inputs["input_ids"].clone() labels[0, :len(self.tokenizer(prompt)["input_ids"][0])] = -100 return {"input_ids": inputs["input_ids"][0], "attention_mask": inputs["attention_mask"][0], "labels": labels[0]} data = [] with open("temp.csv", "wb") as f: f.write(uploaded_csv.read()) with open("temp.csv", "r") as f: reader = csv.DictReader(f) for row in reader: data.append({"prompt": row["prompt"], "response": row["response"]}) dataset = SFTDataset(data, st.session_state['builder'].tokenizer) dataloader = DataLoader(dataset, batch_size=2) optimizer = torch.optim.AdamW(st.session_state['builder'].model.parameters(), lr=2e-5) st.session_state['builder'].model.train() for _ in range(1): for batch in dataloader: optimizer.zero_grad() outputs = st.session_state['builder'].model(**{k: v.to(st.session_state['builder'].model.device) for k, v in batch.items()}) outputs.loss.backward() optimizer.step() st.success("NLP sharpened! πŸŽ‰") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Tune 🎨") uploaded_files = st.file_uploader("Upload Images", type=["png", "jpg"], accept_multiple_files=True, key="cv_upload") text_input = st.text_area("Text (one per image)", "Bat Neon\nIron Glow", key="cv_text") if uploaded_files and st.button("Tune CV πŸ”„"): import torch from PIL import Image import numpy as np images = [Image.open(f).convert("RGB") for f in uploaded_files] texts = text_input.splitlines()[:len(images)] optimizer = torch.optim.AdamW(st.session_state['builder'].pipeline.unet.parameters(), lr=1e-5) st.session_state['builder'].pipeline.unet.train() for _ in range(1): for img, text in zip(images, texts): optimizer.zero_grad() latents = st.session_state['builder'].pipeline.vae.encode(torch.tensor(np.array(img)).permute(2, 0, 1).unsqueeze(0).float().to(st.session_state['builder'].pipeline.device)).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, 1000, (1,), device=latents.device) noisy_latents = st.session_state['builder'].pipeline.scheduler.add_noise(latents, noise, timesteps) text_emb = st.session_state['builder'].pipeline.text_encoder(st.session_state['builder'].pipeline.tokenizer(text, return_tensors="pt").input_ids.to(st.session_state['builder'].pipeline.device))[0] pred_noise = st.session_state['builder'].pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_emb).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() for img, text in zip(images, texts): filename = generate_filename(text) img.save(filename) st.success("CV polished! πŸŽ‰") with tab3: st.header("Test Titans πŸ§ͺ (Quick Check!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! ⚠️") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Test 🧠") prompt = st.text_area("Prompt", "What’s a superhero?", key="nlp_test") if st.button("Test NLP ▢️"): result = st.session_state['builder'].evaluate(prompt) st.write(f"**Answer**: {result}") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Test 🎨") prompt = st.text_area("Prompt", "Neon Batman", key="cv_test") if st.button("Test CV ▢️"): with st.spinner("Generating... ⏳"): img = st.session_state['builder'].generate(prompt) st.image(img, caption="Generated Art") with tab4: st.header("Camera Snap πŸ“· (Instant Shots!)") from streamlit_webrtc import webrtc_streamer ctx = webrtc_streamer( key="camera", video_processor_factory=VideoSnapshot, frontend_rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]} ) if ctx.video_processor: snapshot_text = st.text_input("Snapshot Text", "Live Snap") if st.button("Snap It! πŸ“Έ"): snapshot = ctx.video_processor.take_snapshot() if snapshot: filename = generate_filename(snapshot_text) snapshot.save(filename) st.image(snapshot, caption=filename, use_container_width=True) st.success("Snapped! πŸŽ‰") # Demo Dataset st.subheader("Demo CV Dataset 🎨") demo_texts = ["Bat Neon", "Iron Glow"] demo_images = [generate_filename(t) for t in demo_texts] for img, text in zip(demo_images, demo_texts): if not os.path.exists(img): from PIL import Image Image.new("RGB", (100, 100)).save(img) st.code("\n".join([f"{i+1}. {t} -> {img}" for i, (t, img) in enumerate(zip(demo_texts, demo_images))]), language="text") if st.button("Download Demo CSV πŸ“"): csv_path = f"demo_cv_{int(time.time())}.csv" with open(csv_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["image", "text"]) for img, text in zip(demo_images, demo_texts): writer.writerow([img, text]) st.markdown(get_download_link(csv_path, "text/csv", "Download Demo CSV"), unsafe_allow_html=True)