awacke1's picture
Update app.py
4cb9027 verified
raw
history blame
13 kB
#!/usr/bin/env python3
import os
import base64
import streamlit as st
import pandas as pd
import csv
import time
from dataclasses import dataclass
from PIL import Image
from datetime import datetime
import pytz
from streamlit_webrtc import webrtc_streamer, VideoTransformerBase
import av
# Minimal initial imports to reduce startup delay
st.set_page_config(page_title="SFT Tiny Titans 🚀", page_icon="🤖", layout="wide", initial_sidebar_state="expanded")
# Model Configurations
@dataclass
class ModelConfig:
name: str
base_model: str
model_type: str = "causal_lm"
@property
def model_path(self):
return f"models/{self.name}"
@dataclass
class DiffusionConfig:
name: str
base_model: str
@property
def model_path(self):
return f"diffusion_models/{self.name}"
# Lazy-loaded Builders
class ModelBuilder:
def __init__(self):
self.config = None
self.model = None
self.tokenizer = None
def load_model(self, model_path: str, config: ModelConfig):
from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
self.model = AutoModelForCausalLM.from_pretrained(model_path)
self.tokenizer = AutoTokenizer.from_pretrained(model_path)
if self.tokenizer.pad_token is None:
self.tokenizer.pad_token = self.tokenizer.eos_token
self.config = config
self.model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
def evaluate(self, prompt: str):
import torch
self.model.eval()
with torch.no_grad():
inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device)
outputs = self.model.generate(**inputs, max_new_tokens=50)
return self.tokenizer.decode(outputs[0], skip_special_tokens=True)
class DiffusionBuilder:
def __init__(self):
self.config = None
self.pipeline = None
def load_model(self, model_path: str, config: DiffusionConfig):
from diffusers import StableDiffusionPipeline
import torch
self.pipeline = StableDiffusionPipeline.from_pretrained(model_path)
self.pipeline.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))
self.config = config
def generate(self, prompt: str):
return self.pipeline(prompt, num_inference_steps=20).images[0]
# Utilities
def get_download_link(file_path, mime_type="text/plain", label="Download"):
with open(file_path, 'rb') as f:
data = f.read()
b64 = base64.b64encode(data).decode()
return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label} 📥</a>'
def generate_filename(text_line):
central = pytz.timezone('US/Central')
timestamp = datetime.now(central).strftime("%Y%m%d_%I%M%S_%p")
safe_text = ''.join(c if c.isalnum() else '_' for c in text_line[:50])
return f"{timestamp}_{safe_text}.png"
def get_gallery_files(file_types):
return sorted([f for ext in file_types for f in glob.glob(f"*.{ext}")])
# Video Transformer for WebRTC
class VideoSnapshot(VideoTransformerBase):
def __init__(self):
self.snapshot = None
def transform(self, frame):
img = frame.to_ndarray(format="bgr24")
return img
def take_snapshot(self):
if self.snapshot is not None:
return Image.fromarray(self.snapshot)
# Main App
st.title("SFT Tiny Titans 🚀 (Lean & Mean!)")
# Sidebar Galleries
st.sidebar.header("Media Gallery 🎨")
for gallery_type, file_types, emoji in [
("Images 📸", ["png", "jpg", "jpeg"], "🖼️"),
("Videos 🎥", ["mp4"], "🎬")
]:
st.sidebar.subheader(f"{gallery_type} {emoji}")
files = get_gallery_files(file_types)
if files:
cols = st.sidebar.columns(3)
for idx, file in enumerate(files[:6]):
with cols[idx % 3]:
if "Images" in gallery_type:
st.image(Image.open(file), caption=file.split('/')[-1], use_column_width=True)
elif "Videos" in gallery_type:
st.video(file)
# Sidebar Model Management
st.sidebar.subheader("Model Hub 🗂️")
model_type = st.sidebar.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"])
model_options = ["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if "NLP" in model_type else ["stabilityai/stable-diffusion-2-1", "CompVis/stable-diffusion-v1-4"]
selected_model = st.sidebar.selectbox("Select Model", ["None"] + model_options)
if selected_model != "None" and st.sidebar.button("Load Model 📂"):
builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder()
config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=selected_model)
with st.spinner("Loading... ⏳"):
builder.load_model(selected_model, config)
st.session_state['builder'] = builder
st.session_state['model_loaded'] = True
# Tabs
tab1, tab2, tab3, tab4 = st.tabs([
"Build Titan 🌱",
"Fine-Tune Titans 🔧",
"Test Titans 🧪",
"Camera Snap 📷"
])
with tab1:
st.header("Build Titan 🌱 (Start Small!)")
model_type = st.selectbox("Model Type", ["NLP (Causal LM)", "CV (Diffusion)"], key="build_type")
base_model = st.selectbox("Select Model", model_options, key="build_model")
if st.button("Download Model ⬇️"):
config = (ModelConfig if "NLP" in model_type else DiffusionConfig)(name=f"titan_{int(time.time())}", base_model=base_model)
builder = ModelBuilder() if "NLP" in model_type else DiffusionBuilder()
with st.spinner("Fetching... ⏳"):
builder.load_model(base_model, config)
st.session_state['builder'] = builder
st.session_state['model_loaded'] = True
st.success("Titan ready! 🎉")
with tab2:
st.header("Fine-Tune Titans 🔧 (Sharpen Up!)")
if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False):
st.warning("Load a Titan first! ⚠️")
else:
if isinstance(st.session_state['builder'], ModelBuilder):
st.subheader("NLP Tune 🧠")
uploaded_csv = st.file_uploader("Upload CSV", type="csv", key="nlp_csv")
if uploaded_csv and st.button("Tune NLP 🔄"):
from torch.utils.data import Dataset, DataLoader
import torch
class SFTDataset(Dataset):
def __init__(self, data, tokenizer):
self.data = data
self.tokenizer = tokenizer
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
prompt = self.data[idx]["prompt"]
response = self.data[idx]["response"]
inputs = self.tokenizer(f"{prompt} {response}", return_tensors="pt", padding="max_length", max_length=128, truncation=True)
labels = inputs["input_ids"].clone()
labels[0, :len(self.tokenizer(prompt)["input_ids"][0])] = -100
return {"input_ids": inputs["input_ids"][0], "attention_mask": inputs["attention_mask"][0], "labels": labels[0]}
data = []
with open("temp.csv", "wb") as f:
f.write(uploaded_csv.read())
with open("temp.csv", "r") as f:
reader = csv.DictReader(f)
for row in reader:
data.append({"prompt": row["prompt"], "response": row["response"]})
dataset = SFTDataset(data, st.session_state['builder'].tokenizer)
dataloader = DataLoader(dataset, batch_size=2)
optimizer = torch.optim.AdamW(st.session_state['builder'].model.parameters(), lr=2e-5)
st.session_state['builder'].model.train()
for _ in range(3): # Simplified epochs
for batch in dataloader:
optimizer.zero_grad()
outputs = st.session_state['builder'].model(**{k: v.to(st.session_state['builder'].model.device) for k, v in batch.items()})
outputs.loss.backward()
optimizer.step()
st.success("NLP tuned! 🎉")
elif isinstance(st.session_state['builder'], DiffusionBuilder):
st.subheader("CV Tune 🎨")
uploaded_files = st.file_uploader("Upload Images", type=["png", "jpg"], accept_multiple_files=True, key="cv_upload")
text_input = st.text_area("Text (one per image)", "Bat Neon\nIron Glow", key="cv_text")
if uploaded_files and st.button("Tune CV 🔄"):
import torch
images = [Image.open(f).convert("RGB") for f in uploaded_files]
texts = text_input.splitlines()[:len(images)]
optimizer = torch.optim.AdamW(st.session_state['builder'].pipeline.unet.parameters(), lr=1e-5)
st.session_state['builder'].pipeline.unet.train()
for _ in range(3): # Simplified epochs
for img, text in zip(images, texts):
optimizer.zero_grad()
latents = st.session_state['builder'].pipeline.vae.encode(torch.tensor(np.array(img)).permute(2, 0, 1).unsqueeze(0).float().to(st.session_state['builder'].pipeline.device)).latent_dist.sample()
noise = torch.randn_like(latents)
timesteps = torch.randint(0, 1000, (1,), device=latents.device)
noisy_latents = st.session_state['builder'].pipeline.scheduler.add_noise(latents, noise, timesteps)
text_emb = st.session_state['builder'].pipeline.text_encoder(st.session_state['builder'].pipeline.tokenizer(text, return_tensors="pt").input_ids.to(st.session_state['builder'].pipeline.device))[0]
pred_noise = st.session_state['builder'].pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_emb).sample
loss = torch.nn.functional.mse_loss(pred_noise, noise)
loss.backward()
optimizer.step()
for img, text in zip(images, texts):
filename = generate_filename(text)
img.save(filename)
st.success("CV tuned! 🎉")
with tab3:
st.header("Test Titans 🧪 (Showtime!)")
if 'builder' not in st.session_state or not st.session_state.get('model_loaded', False):
st.warning("Load a Titan first! ⚠️")
else:
if isinstance(st.session_state['builder'], ModelBuilder):
st.subheader("NLP Test 🧠")
prompt = st.text_area("Prompt", "What’s a superhero party?", key="nlp_test")
if st.button("Test NLP ▶️"):
result = st.session_state['builder'].evaluate(prompt)
st.write(f"**Answer**: {result}")
elif isinstance(st.session_state['builder'], DiffusionBuilder):
st.subheader("CV Test 🎨")
prompt = st.text_area("Prompt", "Neon Batman", key="cv_test")
if st.button("Test CV ▶️"):
with st.spinner("Generating... ⏳"):
img = st.session_state['builder'].generate(prompt)
st.image(img, caption="Generated Art")
with tab4:
st.header("Camera Snap 📷 (Live Action!)")
ctx = webrtc_streamer(key="camera", video_transformer_factory=VideoSnapshot, rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]})
if ctx.video_transformer:
snapshot_text = st.text_input("Snapshot Text", "Live Snap")
if st.button("Snap It! 📸"):
snapshot = ctx.video_transformer.take_snapshot()
if snapshot:
filename = generate_filename(snapshot_text)
snapshot.save(filename)
st.image(snapshot, caption=filename)
st.success("Snapped! 🎉")
# Demo Dataset
st.subheader("Demo CV Dataset 🎨")
demo_texts = ["Bat Neon", "Iron Glow", "Thor Spark"]
demo_images = [generate_filename(t) for t in demo_texts]
for img, text in zip(demo_images, demo_texts):
if not os.path.exists(img):
Image.new("RGB", (100, 100)).save(img)
st.code("\n".join([f"{i+1}. {t} -> {img}" for i, (t, img) in enumerate(zip(demo_texts, demo_images))]), language="text")
if st.button("Download Demo CSV 📝"):
csv_path = f"demo_cv_{int(time.time())}.csv"
with open(csv_path, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["image", "text"])
for img, text in zip(demo_images, demo_texts):
writer.writerow([img, text])
st.markdown(get_download_link(csv_path, "text/csv", "Download Demo CSV"), unsafe_allow_html=True)