#!/usr/bin/env python3 import os import shutil import glob import base64 import streamlit as st import pandas as pd import torch from transformers import AutoModelForCausalLM, AutoTokenizer from torch.utils.data import Dataset, DataLoader import csv import time from dataclasses import dataclass from typing import Optional, Tuple import zipfile import math from PIL import Image import random import logging from datetime import datetime import pytz from diffusers import StableDiffusionPipeline from urllib.parse import quote import cv2 # Logging setup logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Page Configuration st.set_page_config(page_title="SFT Tiny Titans ๐Ÿš€", page_icon="๐Ÿค–", layout="wide", initial_sidebar_state="expanded") # Model Configurations @dataclass class ModelConfig: name: str base_model: str size: str domain: Optional[str] = None model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str size: str @property def model_path(self): return f"diffusion_models/{self.name}" # Datasets class SFTDataset(Dataset): def __init__(self, data, tokenizer, max_length=128): self.data = data self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.data) def __getitem__(self, idx): prompt = self.data[idx]["prompt"] response = self.data[idx]["response"] full_text = f"{prompt} {response}" full_encoding = self.tokenizer(full_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt") prompt_encoding = self.tokenizer(prompt, max_length=self.max_length, padding=False, truncation=True, return_tensors="pt") input_ids = full_encoding["input_ids"].squeeze() attention_mask = full_encoding["attention_mask"].squeeze() labels = input_ids.clone() prompt_len = prompt_encoding["input_ids"].shape[1] if prompt_len < self.max_length: labels[:prompt_len] = -100 return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels} class DiffusionDataset(Dataset): def __init__(self, images, texts): self.images = images self.texts = texts def __len__(self): return len(self.images) def __getitem__(self, idx): return {"image": self.images[idx], "text": self.texts[idx]} # Model Builders class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None self.sft_data = None def load_model(self, model_path: str, config: Optional[ModelConfig] = None): self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if config: self.config = config return self def fine_tune_sft(self, csv_path: str, epochs: int = 3, batch_size: int = 4): self.sft_data = [] with open(csv_path, "r") as f: reader = csv.DictReader(f) for row in reader: self.sft_data.append({"prompt": row["prompt"], "response": row["response"]}) dataset = SFTDataset(self.sft_data, self.tokenizer) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5) self.model.train() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(device) for epoch in range(epochs): total_loss = 0 for batch in dataloader: optimizer.zero_grad() input_ids = batch["input_ids"].to(device) attention_mask = batch["attention_mask"].to(device) labels = batch["labels"].to(device) outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels) loss = outputs.loss loss.backward() optimizer.step() total_loss += loss.item() st.write(f"Epoch {epoch + 1} completed. Average loss: {total_loss / len(dataloader):.4f}") return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.model.save_pretrained(path) self.tokenizer.save_pretrained(path) def evaluate(self, prompt: str): self.model.eval() with torch.no_grad(): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device) outputs = self.model.generate(**inputs, max_new_tokens=50, do_sample=True, top_p=0.95, temperature=0.7) return self.tokenizer.decode(outputs[0], skip_special_tokens=True) class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): self.pipeline = StableDiffusionPipeline.from_pretrained(model_path) self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu") if config: self.config = config return self def fine_tune_sft(self, images, texts, epochs=3): dataset = DiffusionDataset(images, texts) dataloader = DataLoader(dataset, batch_size=1, shuffle=True) optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5) self.pipeline.unet.train() for epoch in range(epochs): total_loss = 0 for batch in dataloader: optimizer.zero_grad() image = batch["image"].to(self.pipeline.device) text = batch["text"] latents = self.pipeline.vae.encode(image).latent_dist.sample() noise = torch.randn_like(latents) timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device) noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps) text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(self.pipeline.device))[0] pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample loss = torch.nn.functional.mse_loss(pred_noise, noise) loss.backward() optimizer.step() total_loss += loss.item() st.write(f"Epoch {epoch + 1} completed. Average loss: {total_loss / len(dataloader):.4f}") return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=50).images[0] # Utilities def get_download_link(file_path, mime_type="text/plain", label="Download"): with open(file_path, 'rb') as f: data = f.read() b64 = base64.b64encode(data).decode() return f'{label} ๐Ÿ“ฅ' def zip_directory(directory_path, zip_path): with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: for root, _, files in os.walk(directory_path): for file in files: zipf.write(os.path.join(root, file), os.path.relpath(os.path.join(root, file), os.path.dirname(directory_path))) def get_model_files(model_type="causal_lm"): path = "models/*" if model_type == "causal_lm" else "diffusion_models/*" return [d for d in glob.glob(path) if os.path.isdir(d)] def get_gallery_files(file_types): return sorted([f for ext in file_types for f in glob.glob(f"*.{ext}")]) def generate_filename(text_line): central = pytz.timezone('US/Central') timestamp = datetime.now(central).strftime("%Y%m%d_%I%M%S_%p") safe_text = ''.join(c if c.isalnum() else '_' for c in text_line[:50]) return f"{timestamp}_{safe_text}.png" def display_search_links(query): search_urls = { "ArXiv": f"https://arxiv.org/search/?query={quote(query)}", "Wikipedia": f"https://en.wikipedia.org/wiki/{quote(query)}", "Google": f"https://www.google.com/search?q={quote(query)}", "YouTube": f"https://www.youtube.com/results?search_query={quote(query)}" } return ' '.join([f"[{name}]({url})" for name, url in search_urls.items()]) def detect_cameras(): cameras = [] for i in range(2): # Check first two indices cap = cv2.VideoCapture(i) if cap.isOpened(): cameras.append(i) cap.release() return cameras # Agent Classes class NLPAgent: def __init__(self, model, tokenizer): self.model = model self.tokenizer = tokenizer self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(self.device) def generate(self, prompt: str) -> str: self.model.eval() with torch.no_grad(): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.device) outputs = self.model.generate(**inputs, max_new_tokens=100, do_sample=True, top_p=0.95, temperature=0.7) return self.tokenizer.decode(outputs[0], skip_special_tokens=True) def plan_party(self, task: str) -> pd.DataFrame: search_result = "Latest trends for 2025: Gold-plated Batman statues, VR superhero battles." prompt = f"Given this context: '{search_result}'\n{task}" plan_text = self.generate(prompt) st.markdown(f"Search Links: {display_search_links('superhero party trends')}", unsafe_allow_html=True) locations = {"Wayne Manor": (42.3601, -71.0589), "New York": (40.7128, -74.0060)} travel_times = {loc: calculate_cargo_travel_time(coords, locations["Wayne Manor"]) for loc, coords in locations.items() if loc != "Wayne Manor"} data = [ {"Location": "New York", "Travel Time (hrs)": travel_times["New York"], "Idea": "Gold-plated Batman statues"}, {"Location": "Wayne Manor", "Travel Time (hrs)": 0.0, "Idea": "VR superhero battles"} ] return pd.DataFrame(data) class CVAgent: def __init__(self, pipeline): self.pipeline = pipeline def generate(self, prompt: str) -> Image.Image: return self.pipeline(prompt, num_inference_steps=50).images[0] def enhance_images(self, task: str) -> pd.DataFrame: search_result = "Latest superhero art trends: Neon outlines, 3D holograms." prompt = f"Given this context: '{search_result}'\n{task}" st.markdown(f"Search Links: {display_search_links('superhero art trends')}", unsafe_allow_html=True) data = [ {"Image Theme": "Batman", "Enhancement": "Neon outlines"}, {"Image Theme": "Iron Man", "Enhancement": "3D holograms"} ] return pd.DataFrame(data) def calculate_cargo_travel_time(origin_coords: Tuple[float, float], destination_coords: Tuple[float, float], cruising_speed_kmh: float = 750.0) -> float: def to_radians(degrees: float) -> float: return degrees * (math.pi / 180) lat1, lon1 = map(to_radians, origin_coords) lat2, lon2 = map(to_radians, destination_coords) EARTH_RADIUS_KM = 6371.0 dlon = lon2 - lon1 dlat = lat2 - lat1 a = (math.sin(dlat / 2) ** 2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2) ** 2) c = 2 * math.asin(math.sqrt(a)) distance = EARTH_RADIUS_KM * c actual_distance = distance * 1.1 flight_time = (actual_distance / cruising_speed_kmh) + 1.0 return round(flight_time, 2) # Main App st.title("SFT Tiny Titans ๐Ÿš€ (Small but Mighty!)") # Sidebar Galleries st.sidebar.header("Shared Galleries ๐ŸŽจ") for gallery_type, file_types, emoji in [ ("Images ๐Ÿ“ธ", ["png", "jpg", "jpeg"], "๐Ÿ–ผ๏ธ"), ("Videos ๐ŸŽฅ", ["mp4"], "๐ŸŽฌ"), ("Audio ๐ŸŽถ", ["mp3"], "๐ŸŽต") ]: st.sidebar.subheader(f"{gallery_type} {emoji}") files = get_gallery_files(file_types) if files: cols_num = st.sidebar.slider(f"{gallery_type} Columns", 1, 5, 3, key=f"{gallery_type}_cols") cols = st.sidebar.columns(cols_num) for idx, file in enumerate(files[:cols_num * 2]): with cols[idx % cols_num]: if "Images" in gallery_type: st.image(Image.open(file), caption=file, use_column_width=True) elif "Videos" in gallery_type: st.video(file) elif "Audio" in gallery_type: st.audio(file) st.sidebar.subheader("Model Management ๐Ÿ—‚๏ธ") model_type = st.sidebar.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"]) model_dirs = get_model_files("causal_lm" if "NLP" in model_type else "diffusion") selected_model = st.sidebar.selectbox("Select Saved Model", ["None"] + model_dirs) if selected_model != "None" and st.sidebar.button("Load Model ๐Ÿ“‚"): builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=os.path.basename(selected_model), base_model="unknown", size="small") builder.load_model(selected_model, config) st.session_state['builder'] = builder st.session_state['model_loaded'] = True st.rerun() # Tabs tab1, tab2, tab3, tab4, tab5, tab6 = st.tabs([ "Build Titan ๐ŸŒฑ", "Fine-Tune NLP ๐Ÿง ", "Fine-Tune CV ๐ŸŽจ", "Test Titans ๐Ÿงช", "Agentic RAG ๐ŸŒ€", "Camera Inputs ๐Ÿ“ท" ]) with tab1: st.header("Build Your Titan ๐ŸŒฑ") model_type = st.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"], key="build_type") base_model = st.selectbox( "Select Tiny Model", ["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if "NLP" in model_type else ["stabilityai/stable-diffusion-2-1", "CompVis/stable-diffusion-v1-4"] ) model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}") if st.button("Download Model โฌ‡๏ธ"): config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=model_name, base_model=base_model, size="small") builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder() builder.load_model(base_model, config) builder.save_model(config.model_path) st.session_state['builder'] = builder st.session_state['model_loaded'] = True st.rerun() with tab2: st.header("Fine-Tune NLP Titan ๐Ÿง  (Word Wizardry!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False) or not isinstance(st.session_state['builder'], ModelBuilder): st.warning("Load an NLP Titan first! โš ๏ธ") else: uploaded_csv = st.file_uploader("Upload CSV for NLP SFT", type="csv", key="nlp_csv") if uploaded_csv and st.button("Tune the Wordsmith ๐Ÿ”ง"): csv_path = f"nlp_sft_data_{int(time.time())}.csv" with open(csv_path, "wb") as f: f.write(uploaded_csv.read()) new_model_name = f"{st.session_state['builder'].config.name}-sft-{int(time.time())}" new_config = ModelConfig(name=new_model_name, base_model=st.session_state['builder'].config.base_model, size="small") st.session_state['builder'].config = new_config st.session_state['builder'].fine_tune_sft(csv_path) st.session_state['builder'].save_model(new_config.model_path) zip_path = f"{new_config.model_path}.zip" zip_directory(new_config.model_path, zip_path) st.markdown(get_download_link(zip_path, "application/zip", "Download Tuned NLP Titan"), unsafe_allow_html=True) with tab3: st.header("Fine-Tune CV Titan ๐ŸŽจ (Vision Vibes!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False) or not isinstance(st.session_state['builder'], DiffusionBuilder): st.warning("Load a CV Titan first! โš ๏ธ") else: uploaded_files = st.file_uploader("Upload Images/Videos", type=["png", "jpg", "jpeg", "mp4", "mp3"], accept_multiple_files=True, key="cv_upload") text_input = st.text_area("Enter Text (one line per image)", "Batman Neon\nIron Man Hologram\nThor Lightning", key="cv_text") if uploaded_files and st.button("Tune the Visionary ๐Ÿ–Œ๏ธ"): images = [Image.open(f) for f in uploaded_files if f.type.startswith("image")] texts = text_input.splitlines() if len(images) > len(texts): texts.extend([""] * (len(images) - len(texts))) elif len(texts) > len(images): texts = texts[:len(images)] st.session_state['builder'].fine_tune_sft(images, texts) new_model_name = f"{st.session_state['builder'].config.name}-sft-{int(time.time())}" new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['builder'].config.base_model, size="small") st.session_state['builder'].config = new_config st.session_state['builder'].save_model(new_config.model_path) for img, text in zip(images, texts): filename = generate_filename(text) img.save(filename) st.image(img, caption=filename) zip_path = f"{new_config.model_path}.zip" zip_directory(new_config.model_path, zip_path) st.markdown(get_download_link(zip_path, "application/zip", "Download Tuned CV Titan"), unsafe_allow_html=True) with tab4: st.header("Test Titans ๐Ÿงช (Brains & Eyes!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! โš ๏ธ") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP Test ๐Ÿง ") test_prompt = st.text_area("Enter NLP Prompt", "Plan a superhero party!", key="nlp_test") if st.button("Test NLP Titan โ–ถ๏ธ"): result = st.session_state['builder'].evaluate(test_prompt) st.write(f"**Response**: {result}") elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV Test ๐ŸŽจ") test_prompt = st.text_area("Enter CV Prompt", "Superhero in neon style", key="cv_test") if st.button("Test CV Titan โ–ถ๏ธ"): image = st.session_state['builder'].generate(test_prompt) st.image(image, caption="Generated Image") cameras = detect_cameras() if cameras: st.subheader("Camera Snapshot Test ๐Ÿ“ท") camera_idx = st.selectbox("Select Camera", cameras, key="camera_select") snapshot_text = st.text_input("Snapshot Text", "Camera Snap", key="snap_text") if st.button("Capture Snapshot ๐Ÿ“ธ"): cap = cv2.VideoCapture(camera_idx) ret, frame = cap.read() if ret: rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = Image.fromarray(rgb_frame) filename = generate_filename(snapshot_text) img.save(filename) st.image(img, caption=filename) cap.release() with tab5: st.header("Agentic RAG ๐ŸŒ€ (Smart Plans & Visions!)") if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False): st.warning("Load a Titan first! โš ๏ธ") else: if isinstance(st.session_state['builder'], ModelBuilder): st.subheader("NLP RAG Party ๐Ÿง ") if st.button("Run NLP RAG Demo ๐ŸŽ‰"): agent = NLPAgent(st.session_state['builder'].model, st.session_state['builder'].tokenizer) task = "Plan a luxury superhero-themed party at Wayne Manor." plan_df = agent.plan_party(task) st.dataframe(plan_df) elif isinstance(st.session_state['builder'], DiffusionBuilder): st.subheader("CV RAG Enhance ๐ŸŽจ") if st.button("Run CV RAG Demo ๐Ÿ–Œ๏ธ"): agent = CVAgent(st.session_state['builder'].pipeline) task = "Enhance superhero images with 2025 trends." enhance_df = agent.enhance_images(task) st.dataframe(enhance_df) with tab6: st.header("Camera Inputs ๐Ÿ“ท (Live Feed Fun!)") cameras = detect_cameras() if not cameras: st.warning("No cameras detected! โš ๏ธ") else: st.write(f"Detected {len(cameras)} cameras!") for idx in cameras: st.subheader(f"Camera {idx}") cap = cv2.VideoCapture(idx) if st.button(f"Capture from Camera {idx} ๐Ÿ“ธ", key=f"cap_{idx}"): ret, frame = cap.read() if ret: rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = Image.fromarray(rgb_frame) filename = generate_filename(f"Camera_{idx}_snap") img.save(filename) st.image(img, caption=filename) cap.release() # Preload demo files demo_images = ["20250319_010000_AM_Batman.png", "20250319_010001_AM_IronMan.png", "20250319_010002_AM_Thor.png"] demo_videos = ["20250319_010000_AM_Batman.mp4", "20250319_010001_AM_IronMan.mp4", "20250319_010002_AM_Thor.mp4"] for img in demo_images: if not os.path.exists(img): Image.new("RGB", (100, 100)).save(img) for vid in demo_videos: if not os.path.exists(vid): with open(vid, "wb") as f: f.write(b"") # Dummy file # Demo SFT Dataset st.subheader("Diffusion SFT Demo Dataset ๐ŸŽจ") demo_texts = ["Batman Neon", "Iron Man Hologram", "Thor Lightning"] demo_code = "\n".join([f"{i+1}. {text} -> {demo_images[i]}" for i, text in enumerate(demo_texts)]) st.code(demo_code, language="text") if st.button("Download Demo CSV ๐Ÿ“"): csv_path = f"demo_diffusion_sft_{int(time.time())}.csv" with open(csv_path, "w", newline="") as f: writer = csv.writer(f) writer.writerow(["image", "text"]) for img, text in zip(demo_images, demo_texts): writer.writerow([img, text]) st.markdown(get_download_link(csv_path, "text/csv", "Download Demo CSV"), unsafe_allow_html=True)