#!/usr/bin/env python3 import os import base64 import streamlit as st import pandas as pd import csv import time from dataclasses import dataclass from PIL import Image from datetime import datetime import pytz from streamlit_webrtc import webrtc_streamer, VideoTransformerBase import av # Minimal initial imports to reduce startup delay st.set_page_config(page_title="SFT Tiny Titans πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded") # Model Configurations @dataclass class ModelConfig: name: str base_model: str model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str @property def model_path(self): return f"diffusion_models/{self.name}" # Lazy-loaded Builders class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None def load_model(self, model_path: str, config: ModelConfig): from transformers import AutoModelForCausalLM, AutoTokenizer import torch self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.config = config self.model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) def evaluate(self, prompt: str): import torch self.model.eval() with torch.no_grad(): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device) outputs = self.model.generate(**inputs, max_new_tokens=50) return self.tokenizer.decode(outputs[0], skip_special_tokens=True) class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: DiffusionConfig): from diffusers import StableDiffusionPipeline import torch self.pipeline = StableDiffusionPipeline.from_pretrained(model_path) self.pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) self.config = config def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] # Utilities def get_download_link(file_path, mime_type="text/plain", label="Download"): with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{label} πŸ“₯' def generate_filename(text_line): central = pytz.timezone('US/Central') timestamp = datetime.now(central).strftime("%Y%m%d_%I%M%S_%p") safe_text = ''.join(c if c.isalnum() else '_' for c in text_line[:50]) return f"{timestamp}_{safe_text}.png" def get_gallery_files(file_types): return sorted([f for ext in file_types for f in glob.glob(f"*.{ext}")]) # Video Transformer for WebRTC class VideoSnapshot(VideoTransformerBase): def __init__(self): self.snapshot = None def transform(self, frame): img = frame.to_ndarray(format="bgr24") return img def take_snapshot(self): if self.snapshot is not None: return Image.fromarray(self.snapshot) # Main App st.title("SFT Tiny Titans πŸš€ (Lean & Mean!)") # Sidebar Galleries st.sidebar.header("Media Gallery 🎨") for gallery_type, file_types, emoji in [ ("Images πŸ“Έ", ["png", "jpg", "jpeg"], "πŸ–ΌοΈ"), ("Videos πŸŽ₯", ["mp4"], "🎬") ]: st.sidebar.subheader(f"{gallery_type} {emoji}") files = get_gallery_files(file_types) if files: cols = st.sidebar.columns(3) for idx, file in enumerate(files[:6]): with cols[idx % 3]: if "Images" in gallery_type: st.image(Image.open(file), caption=file.split('/')[-1], use_column_width=True) elif "Videos" in gallery_type: st.video(file) # Sidebar Model Management st.sidebar.subheader("Model Hub πŸ—‚οΈ") model_type = st.sidebar.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"]) model_options = ["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if "NLP" in model_type else ["stabilityai/stable-diffusion-2-1", "CompVis/stable-diffusion-v1-4"] selected_model = st.sidebar.selectbox("Select Model", ["None"] + model_options) if selected_model != "None" and st.sidebar.button("Load Model πŸ“‚"): builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=selected_model) with st.spinner("Loading... ⏳"): builder.load_model(selected_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True # Tabs tab1, tab2, tab3, tab4 = st.tabs([ "Build Titan 🌱", "Fine-Tune Titans πŸ”§", "Test Titans πŸ§ͺ", "Camera Snap πŸ“·" ]) with tab1: st.header("Build Titan 🌱 (Start Small!)") model_type = st.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"], key="build_type") base_model = st.selectbox("Select Model", model_options, key="build_model") if st.button("Download Model ⬇️"): config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=base_model) builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() with st.spinner("Fetching... ⏳"): builder.load_model(base_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True st.success("Titan ready! πŸŽ‰") with tab2: st.header("Fine-Tune Titans πŸ”§ (Sharpen Up!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! ⚠️") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Tune 🧠") uploaded_csv = st.file_uploader("Upload CSV", type="csv", key="nlp_csv") if uploaded_csv and st.button("Tune NLP πŸ”„"): from torch.utils.data import Dataset, DataLoader import torch class SFTDataset(Dataset): def __init__(self, data, tokenizer): self.data = data self.tokenizer = tokenizer def __len__(self): return len(self.data) def __getitem__(self, idx): prompt = self.data[idx]["prompt"] response = self.data[idx]["response"] inputs = self.tokenizer(f"{prompt} {response}", return_tensors="pt", padding="max_length", max_length=128, truncation=True) labels = inputs["input_ids"].clone() labels[0, :len(self.tokenizer(prompt)["input_ids"][0])] = -100 return {"input_ids": inputs["input_ids"][0], "attention_mask": inputs["attention_mask"][0], "labels": labels[0]} data = [] with open("temp.csv", "wb") as f: f.write(uploaded_csv.read()) with open("temp.csv", "r") as f: reader = csv.DictReader(f) for row in reader: data.append({"prompt": row["prompt"], "response": row["response"]}) dataset = SFTDataset(data, st.session_state['builder'].tokenizer) dataloader = DataLoader(dataset, batch_size=2) optimizer = torch.optim.AdamW(st.session_state['builder'].model.parameters(), lr=2e-5) st.session_state['builder'].model.train() for _ in range(3): # Simplified epochs for batch in dataloader: optimizer.zero_grad() outputs = st.session_state['builder'].model(**{k: v.to(st.session_state['builder'].model.device) for k, v in batch.items()}) outputs.loss.backward() optimizer.step() st.success("NLP tuned! πŸŽ‰") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Tune 🎨") uploaded_files = st.file_uploader("Upload Images", type=["png", "jpg"], accept_multiple_files=True, key="cv_upload") text_input = st.text_area("Text (one per image)", "Bat Neon\nIron Glow", key="cv_text") if uploaded_files and st.button("Tune CV πŸ”„"): import torch images = [Image.open(f).convert("RGB") for f in uploaded_files] texts = text_input.splitlines()[:len(images)] optimizer = torch.optim.AdamW(st.session_state['builder'].pipeline.unet.parameters(), lr=1e-5) st.session_state['builder'].pipeline.unet.train() for _ in range(3): # Simplified epochs for img, text in zip(images, texts): optimizer.zero_grad() latents = st.session_state['builder'].pipeline.vae.encode(torch.tensor(np.array(img)).permute(2, 0, 1).unsqueeze(0).float().to(st.session_state['builder'].pipeline.device)).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, 1000, (1,), device=latents.device) noisy_latents = st.session_state['builder'].pipeline.scheduler.add_noise(latents, noise, timesteps) text_emb = st.session_state['builder'].pipeline.text_encoder(st.session_state['builder'].pipeline.tokenizer(text, return_tensors="pt").input_ids.to(st.session_state['builder'].pipeline.device))[0] pred_noise = st.session_state['builder'].pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_emb).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() for img, text in zip(images, texts): filename = generate_filename(text) img.save(filename) st.success("CV tuned! πŸŽ‰") with tab3: st.header("Test Titans πŸ§ͺ (Showtime!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! ⚠️") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Test 🧠") prompt = st.text_area("Prompt", "What’s a superhero party?", key="nlp_test") if st.button("Test NLP ▢️"): result = st.session_state['builder'].evaluate(prompt) st.write(f"**Answer**: {result}") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Test 🎨") prompt = st.text_area("Prompt", "Neon Batman", key="cv_test") if st.button("Test CV ▢️"): with st.spinner("Generating... ⏳"): img = st.session_state['builder'].generate(prompt) st.image(img, caption="Generated Art") with tab4: st.header("Camera Snap πŸ“· (Live Action!)") ctx = webrtc_streamer(key="camera", video_transformer_factory=VideoSnapshot, rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}) if ctx.video_transformer: snapshot_text = st.text_input("Snapshot Text", "Live Snap") if st.button("Snap It! πŸ“Έ"): snapshot = ctx.video_transformer.take_snapshot() if snapshot: filename = generate_filename(snapshot_text) snapshot.save(filename) st.image(snapshot, caption=filename) st.success("Snapped! πŸŽ‰") # Demo Dataset st.subheader("Demo CV Dataset 🎨") demo_texts = ["Bat Neon", "Iron Glow", "Thor Spark"] demo_images = [generate_filename(t) for t in demo_texts] for img, text in zip(demo_images, demo_texts): if not os.path.exists(img): Image.new("RGB", (100, 100)).save(img) st.code("\n".join([f"{i+1}. {t} -> {img}" for i, (t, img) in enumerate(zip(demo_texts, demo_images))]), language="text") if st.button("Download Demo CSV πŸ“"): csv_path = f"demo_cv_{int(time.time())}.csv" with open(csv_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["image", "text"]) for img, text in zip(demo_images, demo_texts): writer.writerow([img, text]) st.markdown(get_download_link(csv_path, "text/csv", "Download Demo CSV"), unsafe_allow_html=True)