#!/usr/bin/env python3 import os import base64 import streamlit as st import csv import time from dataclasses import dataclass import zipfile import logging import av from streamlit_webrtc import webrtc_streamer, VideoProcessorBase, WebRtcMode # Logging setup logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) st.set_page_config(page_title="SFT Tiny Titans πŸš€", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded") # Model Configurations @dataclass class ModelConfig: name: str base_model: str model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str @property def model_path(self): return f"diffusion_models/{self.name}" # Lazy-loaded Builders class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None def load_model(self, model_path: str, config: ModelConfig): try: from transformers import AutoModelForCausalLM, AutoTokenizer import torch logger.info(f"Loading NLP model: {model_path}") self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token self.config = config self.model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) logger.info("NLP model loaded successfully") except Exception as e: logger.error(f"Error loading NLP model: {str(e)}") raise def fine_tune(self, csv_path): try: from torch.utils.data import Dataset, DataLoader import torch logger.info(f"Starting NLP fine-tuning with {csv_path}") class SFTDataset(Dataset): def __init__(self, data, tokenizer): self.data = data self.tokenizer = tokenizer def __len__(self): return len(self.data) def __getitem__(self, idx): prompt = self.data[idx]["prompt"] response = self.data[idx]["response"] inputs = self.tokenizer(f"{prompt} {response}", return_tensors="pt", padding="max_length", max_length=128, truncation=True) labels = inputs["input_ids"].clone() labels[0, :len(self.tokenizer(prompt)["input_ids"][0])] = -100 return {"input_ids": inputs["input_ids"][0], "attention_mask": inputs["attention_mask"][0], "labels": labels[0]} data = [] with open(csv_path, "r") as f: reader = csv.DictReader(f) for row in reader: data.append({"prompt": row["prompt"], "response": row["response"]}) dataset = SFTDataset(data, self.tokenizer) dataloader = DataLoader(dataset, batch_size=2) optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5) self.model.train() for _ in range(1): for batch in dataloader: optimizer.zero_grad() outputs = self.model(**{k: v.to(self.model.device) for k, v in batch.items()}) outputs.loss.backward() optimizer.step() logger.info("NLP fine-tuning completed") except Exception as e: logger.error(f"Error in NLP fine-tuning: {str(e)}") raise def evaluate(self, prompt: str): try: import torch logger.info(f"Evaluating NLP with prompt: {prompt}") self.model.eval() with torch.no_grad(): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device) outputs = self.model.generate(**inputs, max_new_tokens=50) result = self.tokenizer.decode(outputs[0], skip_special_tokens=True) logger.info(f"NLP evaluation result: {result}") return result except Exception as e: logger.error(f"Error in NLP evaluation: {str(e)}") raise class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: DiffusionConfig): try: from diffusers import StableDiffusionPipeline import torch logger.info(f"Loading diffusion model: {model_path}") self.pipeline = StableDiffusionPipeline.from_pretrained(model_path) self.pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu")) self.config = config logger.info("Diffusion model loaded successfully") except Exception as e: logger.error(f"Error loading diffusion model: {str(e)}") raise def fine_tune(self, images, texts): try: import torch from PIL import Image import numpy as np logger.info("Starting diffusion fine-tuning") optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) self.pipeline.unet.train() for _ in range(1): for img, text in zip(images, texts): optimizer.zero_grad() img_tensor = torch.tensor(np.array(img)).permute(2, 0, 1).unsqueeze(0).float().to(self.pipeline.device) / 255.0 latents = self.pipeline.vae.encode(img_tensor).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (1,), device=latents.device) noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) text_emb = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(self.pipeline.device))[0] pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_emb).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() logger.info("Diffusion fine-tuning completed") except Exception as e: logger.error(f"Error in diffusion fine-tuning: {str(e)}") raise def generate(self, prompt: str): try: logger.info(f"Generating image with prompt: {prompt}") img = self.pipeline(prompt, num_inference_steps=20).images[0] logger.info("Image generated successfully") return img except Exception as e: logger.error(f"Error in image generation: {str(e)}") raise # Utilities def get_download_link(file_path, mime_type="text/plain", label="Download"): with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{label} πŸ“₯' def generate_filename(sequence, ext="png"): from datetime import datetime import pytz central = pytz.timezone('US/Central') timestamp = datetime.now(central).strftime("%d%m%Y%H%M%S%p") return f"{sequence}{timestamp}.{ext}" def get_gallery_files(file_types): import glob return sorted([f for ext in file_types for f in glob.glob(f"*.{ext}")]) def zip_files(files, zip_name): with zipfile.ZipFile(zip_name, 'w', zipfile.ZIP_DEFLATED) as zipf: for file in files: zipf.write(file, os.path.basename(file)) return zip_name # Video Processor for WebRTC class CameraProcessor(VideoProcessorBase): def __init__(self): self.frame = None def recv(self, frame): from PIL import Image img = frame.to_image() self.frame = img return av.VideoFrame.from_image(img) def capture_frame(self): from PIL import Image return self.frame def capture_video(self): from PIL import Image frames = [] start_time = time.time() while time.time() - start_time < 10 and self.frame: frames.append(np.array(self.frame)) time.sleep(0.033) # ~30 FPS return frames # Main App st.title("SFT Tiny Titans πŸš€ (Dual Cam Action!)") # Sidebar Galleries st.sidebar.header("Captured Media 🎨") gallery_container = st.sidebar.empty() def update_gallery(): media_files = get_gallery_files(["png", "mp4"]) with gallery_container: if media_files: cols = st.columns(2) for idx, file in enumerate(media_files[:4]): with cols[idx % 2]: if file.endswith(".png"): from PIL import Image st.image(Image.open(file), caption=file.split('/')[-1], use_container_width=True) elif file.endswith(".mp4"): st.video(file) # Sidebar Model Management st.sidebar.subheader("Model Hub πŸ—‚οΈ") model_type = st.sidebar.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"]) model_options = { "NLP (Causal LM)": "HuggingFaceTB/SmolLM-135M", "CV (Diffusion)": ["CompVis/stable-diffusion-v1-4", "stabilityai/stable-diffusion-2-base", "runwayml/stable-diffusion-v1-5"] } selected_model = st.sidebar.selectbox("Select Model", ["None"] + ([model_options[model_type]] if "NLP" in model_type else model_options[model_type])) if selected_model != "None" and st.sidebar.button("Load Model πŸ“‚"): builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=selected_model) with st.spinner("Loading... ⏳"): try: builder.load_model(selected_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True st.success("Model loaded! πŸŽ‰") except Exception as e: st.error(f"Load failed: {str(e)}") # Tabs tab1, tab2, tab3, tab4 = st.tabs(["Build Titan 🌱", "Camera Snap πŸ“·", "Fine-Tune Titans πŸ”§", "Test Titans πŸ§ͺ"]) with tab1: st.header("Build Titan 🌱 (Quick Start!)") model_type = st.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"], key="build_type") base_model = st.selectbox("Select Model", model_options[model_type], key="build_model") if st.button("Download Model ⬇️"): config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=base_model) builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() with st.spinner("Fetching... ⏳"): try: builder.load_model(base_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True st.success("Titan up! πŸŽ‰") except Exception as e: st.error(f"Download failed: {str(e)}") with tab2: st.header("Camera Snap πŸ“· (Dual Live Feed!)") cols = st.columns(2) processors = {} for i in range(2): with cols[i]: st.subheader(f"Camera {i}") key = f"camera_{i}" processors[key] = webrtc_streamer( key=key, mode=WebRtcMode.SENDRECV, video_processor_factory=CameraProcessor, frontend_rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]} ) if st.button(f"Capture Frame πŸ“Έ Cam {i}", key=f"snap_{i}"): logger.info(f"Capturing frame from Camera {i}") try: if processors[key].video_processor and processors[key].video_processor.frame: snapshot = processors[key].video_processor.capture_frame() filename = generate_filename(i) snapshot.save(filename) st.image(snapshot, caption=filename, use_container_width=True) logger.info(f"Saved snapshot: {filename}") if 'captured_images' not in st.session_state: st.session_state['captured_images'] = [] st.session_state['captured_images'].append(filename) update_gallery() else: st.error("No frame available!") logger.error(f"No frame captured from Camera {i}") except Exception as e: st.error(f"Frame capture failed: {str(e)}") logger.error(f"Error capturing frame: {str(e)}") if st.button(f"Capture Video πŸŽ₯ Cam {i}", key=f"rec_{i}"): logger.info(f"Capturing 10s video from Camera {i}") try: if processors[key].video_processor: frames = processors[key].video_processor.capture_video() if frames: mp4_filename = generate_filename(i, "mp4") with av.open(mp4_filename, "w") as container: stream = container.add_stream("h264", rate=30) stream.width = frames[0].shape[1] stream.height = frames[0].shape[0] for frame in frames: av_frame = av.VideoFrame.from_ndarray(frame, format="rgb24") for packet in stream.encode(av_frame): container.mux(packet) for packet in stream.encode(): container.mux(packet) st.video(mp4_filename) logger.info(f"Saved video: {mp4_filename}") sliced_images = [] step = max(1, len(frames) // 10) for j in range(0, len(frames), step): if len(sliced_images) < 10: img = Image.fromarray(frames[j]) img_filename = generate_filename(f"{i}_{len(sliced_images)}") img.save(img_filename) sliced_images.append(img_filename) st.image(img, caption=img_filename, use_container_width=True) st.session_state['captured_images'] = st.session_state.get('captured_images', []) + sliced_images logger.info(f"Sliced video into {len(sliced_images)} images") update_gallery() else: st.error("No frames recorded!") logger.error("No frames captured during video recording") else: st.error("Camera processor not initialized!") logger.error(f"Processor not ready for Camera {i}") except Exception as e: st.error(f"Video capture failed: {str(e)}") logger.error(f"Error capturing video: {str(e)}") with tab3: st.header("Fine-Tune Titans πŸ”§ (Tune Fast!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! ⚠️") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Tune 🧠") uploaded_csv = st.file_uploader("Upload CSV", type="csv", key="nlp_csv") if uploaded_csv and st.button("Tune NLP πŸ”„"): logger.info("Initiating NLP fine-tune") try: with open("temp.csv", "wb") as f: f.write(uploaded_csv.read()) st.session_state['builder'].fine_tune("temp.csv") st.success("NLP sharpened! πŸŽ‰") except Exception as e: st.error(f"NLP fine-tune failed: {str(e)}") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Tune 🎨") captured_images = get_gallery_files(["png"]) if len(captured_images) >= 2: texts = ["Superhero Neon", "Hero Glow", "Cape Spark"][:len(captured_images)] if st.button("Tune CV πŸ”„"): logger.info("Initiating CV fine-tune") try: from PIL import Image images = [Image.open(img) for img in captured_images] st.session_state['builder'].fine_tune(images, texts) st.success("CV polished! πŸŽ‰") except Exception as e: st.error(f"CV fine-tune failed: {str(e)}") else: st.warning("Capture at least 2 images first! ⚠️") with tab4: st.header("Test Titans πŸ§ͺ (Image Agent Demo!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! ⚠️") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Test 🧠") prompt = st.text_area("Prompt", "What’s a superhero?", key="nlp_test") if st.button("Test NLP ▢️"): logger.info("Running NLP test") try: result = st.session_state['builder'].evaluate(prompt) st.write(f"**Answer**: {result}") except Exception as e: st.error(f"NLP test failed: {str(e)}") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Test 🎨 (Image Set Demo)") captured_images = get_gallery_files(["png"]) if len(captured_images) >= 2: if st.button("Run CV Demo ▢️"): logger.info("Running CV image set demo") try: from PIL import Image images = [Image.open(img) for img in captured_images[:10]] prompts = ["Neon " + os.path.basename(img).split('.')[0] for img in captured_images[:10]] generated_images = [] for prompt in prompts: img = st.session_state['builder'].generate(prompt) generated_images.append(img) cols = st.columns(2) for idx, (orig, gen) in enumerate(zip(images, generated_images)): with cols[idx % 2]: st.image(orig, caption=f"Original: {captured_images[idx]}", use_container_width=True) st.image(gen, caption=f"Generated: {prompts[idx]}", use_container_width=True) md_content = "# Image Set Demo\n\nScript of filenames and descriptions:\n" for i, (img, prompt) in enumerate(zip(captured_images[:10], prompts)): md_content += f"{i+1}. `{img}` - {prompt}\n" md_filename = f"demo_metadata_{int(time.time())}.md" with open(md_filename, "w") as f: f.write(md_content) st.markdown(get_download_link(md_filename, "text/markdown", "Download Metadata .md"), unsafe_allow_html=True) logger.info("CV demo completed with metadata") except Exception as e: st.error(f"CV demo failed: {str(e)}") logger.error(f"Error in CV demo: {str(e)}") else: st.warning("Capture at least 2 images first! ⚠️") # Display Logs st.sidebar.subheader("Action Logs πŸ“œ") log_container = st.sidebar.empty() with log_container: for record in log_records: st.write(f"{record.asctime} - {record.levelname} - {record.message}") update_gallery() # Initial gallery update