File size: 35,509 Bytes
f0c19c8
 
 
 
 
 
 
 
 
 
 
01d7524
f0c19c8
 
 
 
 
 
 
01d7524
f0c19c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6217849
f0c19c8
 
 
 
 
 
 
 
 
 
 
 
 
 
6217849
f0c19c8
 
 
 
 
 
 
 
6217849
f0c19c8
 
 
 
 
 
 
 
6217849
 
f0c19c8
 
6217849
 
 
f0c19c8
6217849
01d7524
6217849
f0c19c8
 
 
6217849
f0c19c8
6217849
f0c19c8
 
 
6217849
f0c19c8
6217849
 
 
 
f0c19c8
 
6217849
f0c19c8
 
 
6217849
 
 
 
 
 
 
 
 
f0c19c8
 
 
6217849
 
 
f0c19c8
6217849
 
f0c19c8
 
 
 
6217849
f0c19c8
6217849
 
 
f0c19c8
6217849
 
f0c19c8
6217849
f0c19c8
6217849
f0c19c8
6217849
 
 
f0c19c8
6217849
 
f0c19c8
 
 
6217849
f0c19c8
 
6217849
01d7524
6217849
f0c19c8
 
 
6217849
 
 
 
 
f0c19c8
6217849
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0c19c8
6217849
 
 
 
 
 
 
 
f0c19c8
6217849
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
01d7524
6217849
 
 
 
 
 
 
f0c19c8
 
 
 
 
 
 
 
 
 
 
6217849
f0c19c8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6217849
 
 
f0c19c8
6217849
 
f0c19c8
 
 
 
6217849
f0c19c8
 
6217849
 
f0c19c8
6217849
 
f0c19c8
 
 
 
 
6217849
 
f0c19c8
 
 
6217849
f0c19c8
 
 
 
6b95d66
f0c19c8
 
6217849
f0c19c8
 
 
 
 
 
6217849
f0c19c8
 
 
6217849
f0c19c8
 
 
 
 
 
6217849
f0c19c8
 
 
6217849
f0c19c8
 
 
 
 
 
 
 
 
6217849
 
 
f0c19c8
 
6217849
f0c19c8
 
 
6217849
f0c19c8
 
 
 
 
 
6b95d66
f0c19c8
4c3762f
01d7524
 
9f7c23f
 
01d7524
9f7c23f
 
6b95d66
9f7c23f
6b95d66
9f7c23f
 
 
01d7524
 
 
9f7c23f
01d7524
4c3762f
f0c19c8
6217849
f0c19c8
 
6217849
f0c19c8
 
 
 
 
 
 
 
 
6b95d66
6217849
f0c19c8
6b95d66
f0c19c8
 
 
 
 
6b95d66
f0c19c8
 
6b95d66
f0c19c8
6b95d66
f0c19c8
 
 
 
 
 
 
 
 
 
 
 
 
 
6b95d66
f0c19c8
 
6217849
f0c19c8
6217849
f0c19c8
6217849
 
 
 
 
 
f0c19c8
 
6217849
 
f0c19c8
 
 
 
 
6217849
f0c19c8
6217849
 
f0c19c8
 
 
 
 
 
 
 
 
6217849
f0c19c8
01d7524
f0c19c8
 
6217849
 
 
 
 
 
f0c19c8
 
 
6217849
 
f0c19c8
 
 
 
 
6b95d66
4c3762f
6b95d66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0c19c8
 
6b95d66
 
 
 
 
 
 
 
400ee5c
6b95d66
400ee5c
 
 
 
 
 
6b95d66
f0c19c8
6b95d66
 
 
 
 
 
 
 
400ee5c
6b95d66
400ee5c
 
 
 
 
 
6b95d66
f0c19c8
 
6217849
 
f0c19c8
 
 
 
6217849
 
 
 
 
 
 
f0c19c8
 
6217849
f0c19c8
 
6217849
f0c19c8
 
6217849
 
 
f0c19c8
 
6217849
f0c19c8
6217849
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0c19c8
6217849
f0c19c8
6217849
f0c19c8
 
6217849
 
 
 
f0c19c8
6217849
f0c19c8
6217849
 
 
 
f0c19c8
 
6217849
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f0c19c8
 
6217849
f0c19c8
 
 
 
 
6217849
 
f0c19c8
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
#!/usr/bin/env python3
import os
import glob
import base64
import streamlit as st
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import csv
import time
from dataclasses import dataclass
from typing import Optional
import zipfile
import math
from PIL import Image
import random
import logging
import numpy as np
import cv2
from diffusers import DiffusionPipeline

logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s")
logger = logging.getLogger(__name__)
log_records = []

class LogCaptureHandler(logging.Handler):
    def emit(self, record):
        log_records.append(record)

logger.addHandler(LogCaptureHandler())

st.set_page_config(
    page_title="SFT Tiny Titans ๐Ÿš€",
    page_icon="๐Ÿค–",
    layout="wide",
    initial_sidebar_state="expanded",
    menu_items={
        'Get Help': 'https://huggingface.co/awacke1',
        'Report a Bug': 'https://huggingface.co/spaces/awacke1',
        'About': "Tiny Titans: Small diffusion models, big CV dreams! ๐ŸŒŒ"
    }
)

if 'captured_images' not in st.session_state:
    st.session_state['captured_images'] = []
if 'cv_builder' not in st.session_state:
    st.session_state['cv_builder'] = None
if 'cv_loaded' not in st.session_state:
    st.session_state['cv_loaded'] = False
if 'active_tab' not in st.session_state:
    st.session_state['active_tab'] = "Build Titan ๐ŸŒฑ"

@dataclass
class DiffusionConfig:
    """Config for our diffusion heroes ๐Ÿฆธโ€โ™‚๏ธ - Keeps the blueprint snappy!"""
    name: str
    base_model: str
    size: str
    @property
    def model_path(self):
        return f"diffusion_models/{self.name}"

class DiffusionDataset(Dataset):
    """Pixel party platter ๐Ÿ• - Images and text for diffusion delight!"""
    def __init__(self, images, texts):
        self.images = images
        self.texts = texts
    def __len__(self):
        return len(self.images)
    def __getitem__(self, idx):
        return {"image": self.images[idx], "text": self.texts[idx]}

class MicroDiffusionBuilder:
    """Tiny titan of diffusion ๐Ÿฃ - Small but mighty for quick demos!"""
    def __init__(self):
        self.config = None
        self.pipeline = None
        self.jokes = ["Micro but mighty! ๐Ÿ’ช", "Small pixels, big dreams! ๐ŸŒŸ"]
    def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None):
        try:
            with st.spinner(f"Loading {model_path}... โณ (Tiny titan powering up!)"):
                self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True)
                self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu")
                if config:
                    self.config = config
            st.success(f"Model loaded! ๐ŸŽ‰ {random.choice(self.jokes)}")
            logger.info(f"Loaded Micro Diffusion: {model_path}")
        except Exception as e:
            st.error(f"Failed to load {model_path}: {str(e)} ๐Ÿ’ฅ (Tiny titan tripped!)")
            logger.error(f"Failed to load {model_path}: {str(e)}")
            raise
        return self
    def fine_tune_sft(self, images, texts, epochs=3):
        try:
            dataset = DiffusionDataset(images, texts)
            dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
            optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5)
            self.pipeline.unet.train()
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            for epoch in range(epochs):
                with st.spinner(f"Epoch {epoch + 1}/{epochs}... โš™๏ธ (Micro titan flexing!)"):
                    total_loss = 0
                    for batch in dataloader:
                        optimizer.zero_grad()
                        image = batch["image"][0].to(device)
                        text = batch["text"][0]
                        latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample()
                        noise = torch.randn_like(latents)
                        timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device)
                        noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps)
                        text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0]
                        pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample
                        loss = torch.nn.functional.mse_loss(pred_noise, noise)
                        loss.backward()
                        optimizer.step()
                        total_loss += loss.item()
                    st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}")
            st.success(f"Micro Diffusion tuned! ๐ŸŽ‰ {random.choice(self.jokes)}")
            logger.info(f"Fine-tuned Micro Diffusion: {self.config.name}")
        except Exception as e:
            st.error(f"Tuning failed: {str(e)} ๐Ÿ’ฅ (Micro snag!)")
            logger.error(f"Tuning failed: {str(e)}")
            raise
        return self
    def save_model(self, path: str):
        try:
            with st.spinner("Saving model... ๐Ÿ’พ (Packing tiny pixels!)"):
                os.makedirs(os.path.dirname(path), exist_ok=True)
                self.pipeline.save_pretrained(path)
            st.success(f"Saved at {path}! โœ… Tiny titan secured!")
            logger.info(f"Saved at {path}")
        except Exception as e:
            st.error(f"Save failed: {str(e)} ๐Ÿ’ฅ (Packing mishap!)")
            logger.error(f"Save failed: {str(e)}")
            raise
    def generate(self, prompt: str):
        try:
            return self.pipeline(prompt, num_inference_steps=20).images[0]
        except Exception as e:
            st.error(f"Generation failed: {str(e)} ๐Ÿ’ฅ (Pixel oopsie!)")
            logger.error(f"Generation failed: {str(e)}")
            raise

class LatentDiffusionBuilder:
    """Scaled-down dreamer ๐ŸŒ™ - Latent magic for efficient artistry!"""
    def __init__(self):
        self.config = None
        self.pipeline = None
        self.jokes = ["Latent vibes only! ๐ŸŒ€", "Small scale, big style! ๐ŸŽจ"]
    def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None):
        try:
            with st.spinner(f"Loading {model_path}... โณ (Latent titan rising!)"):
                self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True)
                self.pipeline.unet = torch.nn.Sequential(*list(self.pipeline.unet.children())[:2])  # Scale down U-Net
                self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu")
                if config:
                    self.config = config
            st.success(f"Model loaded! ๐ŸŽ‰ {random.choice(self.jokes)}")
            logger.info(f"Loaded Latent Diffusion: {model_path}")
        except Exception as e:
            st.error(f"Failed to load {model_path}: {str(e)} ๐Ÿ’ฅ (Latent hiccup!)")
            logger.error(f"Failed to load {model_path}: {str(e)}")
            raise
        return self
    def fine_tune_sft(self, images, texts, epochs=3):
        try:
            dataset = DiffusionDataset(images, texts)
            dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
            optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5)
            self.pipeline.unet.train()
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            for epoch in range(epochs):
                with st.spinner(f"Epoch {epoch + 1}/{epochs}... โš™๏ธ (Latent titan shaping up!)"):
                    total_loss = 0
                    for batch in dataloader:
                        optimizer.zero_grad()
                        image = batch["image"][0].to(device)
                        text = batch["text"][0]
                        latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample()
                        noise = torch.randn_like(latents)
                        timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device)
                        noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps)
                        text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0]
                        pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample
                        loss = torch.nn.functional.mse_loss(pred_noise, noise)
                        loss.backward()
                        optimizer.step()
                        total_loss += loss.item()
                    st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}")
            st.success(f"Latent Diffusion tuned! ๐ŸŽ‰ {random.choice(self.jokes)}")
            logger.info(f"Fine-tuned Latent Diffusion: {self.config.name}")
        except Exception as e:
            st.error(f"Tuning failed: {str(e)} ๐Ÿ’ฅ (Latent snag!)")
            logger.error(f"Tuning failed: {str(e)}")
            raise
        return self
    def save_model(self, path: str):
        try:
            with st.spinner("Saving model... ๐Ÿ’พ (Packing latent dreams!)"):
                os.makedirs(os.path.dirname(path), exist_ok=True)
                self.pipeline.save_pretrained(path)
            st.success(f"Saved at {path}! โœ… Latent titan stashed!")
            logger.info(f"Saved at {path}")
        except Exception as e:
            st.error(f"Save failed: {str(e)} ๐Ÿ’ฅ (Dreamy mishap!)")
            logger.error(f"Save failed: {str(e)}")
            raise
    def generate(self, prompt: str):
        try:
            return self.pipeline(prompt, num_inference_steps=30).images[0]
        except Exception as e:
            st.error(f"Generation failed: {str(e)} ๐Ÿ’ฅ (Latent oopsie!)")
            logger.error(f"Generation failed: {str(e)}")
            raise

class FluxDiffusionBuilder:
    """Distilled dynamo โšก - High-quality pixels in a small package!"""
    def __init__(self):
        self.config = None
        self.pipeline = None
        self.jokes = ["Flux-tastic! โœจ", "Small size, big wow! ๐ŸŽ‡"]
    def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None):
        try:
            with st.spinner(f"Loading {model_path}... โณ (Flux titan charging!)"):
                self.pipeline = DiffusionPipeline.from_pretrained(model_path, low_cpu_mem_usage=True)
                self.pipeline.to("cuda" if torch.cuda.is_available() else "cpu")
                if config:
                    self.config = config
            st.success(f"Model loaded! ๐ŸŽ‰ {random.choice(self.jokes)}")
            logger.info(f"Loaded FLUX.1 Distilled: {model_path}")
        except Exception as e:
            st.error(f"Failed to load {model_path}: {str(e)} ๐Ÿ’ฅ (Flux fizzle!)")
            logger.error(f"Failed to load {model_path}: {str(e)}")
            raise
        return self
    def fine_tune_sft(self, images, texts, epochs=3):
        try:
            dataset = DiffusionDataset(images, texts)
            dataloader = DataLoader(dataset, batch_size=1, shuffle=True)
            optimizer = torch.optim.AdamW(self.pipeline.unet.parameters(), lr=1e-5)
            self.pipeline.unet.train()
            device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            for epoch in range(epochs):
                with st.spinner(f"Epoch {epoch + 1}/{epochs}... โš™๏ธ (Flux titan powering up!)"):
                    total_loss = 0
                    for batch in dataloader:
                        optimizer.zero_grad()
                        image = batch["image"][0].to(device)
                        text = batch["text"][0]
                        latents = self.pipeline.vae.encode(torch.tensor(np.array(image)).permute(2, 0, 1).unsqueeze(0).float().to(device)).latent_dist.sample()
                        noise = torch.randn_like(latents)
                        timesteps = torch.randint(0, self.pipeline.scheduler.num_train_timesteps, (latents.shape[0],), device=latents.device)
                        noisy_latents = self.pipeline.scheduler.add_noise(latents, noise, timesteps)
                        text_embeddings = self.pipeline.text_encoder(self.pipeline.tokenizer(text, return_tensors="pt").input_ids.to(device))[0]
                        pred_noise = self.pipeline.unet(noisy_latents, timesteps, encoder_hidden_states=text_embeddings).sample
                        loss = torch.nn.functional.mse_loss(pred_noise, noise)
                        loss.backward()
                        optimizer.step()
                        total_loss += loss.item()
                    st.write(f"Epoch {epoch + 1} done! Loss: {total_loss / len(dataloader):.4f}")
            st.success(f"FLUX Diffusion tuned! ๐ŸŽ‰ {random.choice(self.jokes)}")
            logger.info(f"Fine-tuned FLUX.1 Distilled: {self.config.name}")
        except Exception as e:
            st.error(f"Tuning failed: {str(e)} ๐Ÿ’ฅ (Flux snag!)")
            logger.error(f"Tuning failed: {str(e)}")
            raise
        return self
    def save_model(self, path: str):
        try:
            with st.spinner("Saving model... ๐Ÿ’พ (Packing flux magic!)"):
                os.makedirs(os.path.dirname(path), exist_ok=True)
                self.pipeline.save_pretrained(path)
            st.success(f"Saved at {path}! โœ… Flux titan secured!")
            logger.info(f"Saved at {path}")
        except Exception as e:
            st.error(f"Save failed: {str(e)} ๐Ÿ’ฅ (Fluxy mishap!)")
            logger.error(f"Save failed: {str(e)}")
            raise
    def generate(self, prompt: str):
        try:
            return self.pipeline(prompt, num_inference_steps=50).images[0]
        except Exception as e:
            st.error(f"Generation failed: {str(e)} ๐Ÿ’ฅ (Flux oopsie!)")
            logger.error(f"Generation failed: {str(e)}")
            raise

def generate_filename(sequence, ext="png"):
    """Time-stamped snapshots โฐ - Keeps our pics organized with cam flair!"""
    from datetime import datetime
    import pytz
    central = pytz.timezone('US/Central')
    dt = datetime.now(central)
    return f"{dt.strftime('%m-%d-%Y-%I-%M-%S-%p')}-{sequence}.{ext}"

def get_download_link(file_path, mime_type="text/plain", label="Download"):
    """Magic link maker ๐Ÿ”— - Snag your files with a click!"""
    try:
        with open(file_path, 'rb') as f:
            data = f.read()
        b64 = base64.b64encode(data).decode()
        return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label} ๐Ÿ“ฅ</a>'
    except Exception as e:
        logger.error(f"Failed to generate link for {file_path}: {str(e)}")
        return f"Error: Could not generate link for {file_path}"

def zip_files(files, zip_path):
    """Zip zap zoo ๐ŸŽ’ - Bundle up your goodies!"""
    try:
        with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            for file in files:
                zipf.write(file, os.path.basename(file))
        logger.info(f"Created ZIP file: {zip_path}")
    except Exception as e:
        logger.error(f"Failed to create ZIP {zip_path}: {str(e)}")
        raise

def delete_files(files):
    """Trash titan ๐Ÿ—‘๏ธ - Clear the stage for new stars!"""
    try:
        for file in files:
            os.remove(file)
            logger.info(f"Deleted file: {file}")
        st.session_state['captured_images'] = [f for f in st.session_state['captured_images'] if f not in files]
    except Exception as e:
        logger.error(f"Failed to delete files: {str(e)}")
        raise

def get_model_files():
    """Model treasure hunt ๐Ÿ—บ๏ธ - Find our diffusion gems!"""
    return [d for d in glob.glob("diffusion_models/*") if os.path.isdir(d)]

def get_gallery_files(file_types):
    """Gallery curator ๐Ÿ–ผ๏ธ - Showcase our pixel masterpieces!"""
    return sorted(list(set(f for ext in file_types for f in glob.glob(f"*.{ext}"))))

def update_gallery():
    """Gallery refresh ๐ŸŒŸ - Keep the art flowing!"""
    media_files = get_gallery_files(["png"])
    if media_files:
        cols = st.sidebar.columns(2)
        for idx, file in enumerate(media_files[:gallery_size * 2]):
            with cols[idx % 2]:
                st.image(Image.open(file), caption=file, use_container_width=True)
                st.markdown(get_download_link(file, "image/png", "Download Snap ๐Ÿ“ธ"), unsafe_allow_html=True)

def get_available_video_devices():
    """Camera roll call ๐ŸŽฅ - Whoโ€™s ready to shine? Fallback if OpenCV flops!"""
    video_devices = [f"Camera {i} ๐ŸŽฅ" for i in range(6)]  # Default to 6 cams
    try:
        detected = []
        for i in range(6):  # Limit to 6 as per your setup
            cap = cv2.VideoCapture(i, cv2.CAP_V4L2)
            if not cap.isOpened():
                cap = cv2.VideoCapture(i)
            if cap.isOpened():
                detected.append(f"Camera {i} ๐ŸŽฅ")
                logger.info(f"Detected camera at index {i}")
                cap.release()
        if detected:
            video_devices = detected
        else:
            logger.warning("No cameras detected by OpenCV; using defaults")
    except Exception as e:
        logger.error(f"Error detecting cameras: {str(e)} - Falling back to defaults")
    return video_devices

st.title("SFT Tiny Titans ๐Ÿš€ (Small Diffusion Delight!)")

st.sidebar.header("Media Gallery ๐ŸŽจ")
gallery_size = st.sidebar.slider("Gallery Size ๐Ÿ“ธ", 1, 10, 4, help="How many snaps to flaunt? ๐ŸŒŸ")
update_gallery()

col1, col2 = st.sidebar.columns(2)
with col1:
    if st.button("Download All ๐Ÿ“ฆ"):
        media_files = get_gallery_files(["png"])
        if media_files:
            zip_path = f"snapshot_collection_{int(time.time())}.zip"
            zip_files(media_files, zip_path)
            st.sidebar.markdown(get_download_link(zip_path, "application/zip", "Download All Snaps ๐Ÿ“ฆ"), unsafe_allow_html=True)
            st.sidebar.success("Snaps zipped! ๐ŸŽ‰ Grab your loot!")
        else:
            st.sidebar.warning("No snaps to zip! ๐Ÿ“ธ Snap some first!")
with col2:
    if st.button("Delete All ๐Ÿ—‘๏ธ"):
        media_files = get_gallery_files(["png"])
        if media_files:
            delete_files(media_files)
            st.sidebar.success("Snaps vanquished! ๐Ÿงน Gallery cleared!")
            update_gallery()
        else:
            st.sidebar.warning("Nothing to delete! ๐Ÿ“ธ Snap some pics!")

uploaded_files = st.sidebar.file_uploader("Upload Goodies ๐ŸŽต๐ŸŽฅ๐Ÿ–ผ๏ธ๐Ÿ“๐Ÿ“œ", type=["mp3", "mp4", "png", "jpeg", "md", "pdf", "docx"], accept_multiple_files=True)
if uploaded_files:
    for uploaded_file in uploaded_files:
        filename = uploaded_file.name
        with open(filename, "wb") as f:
            f.write(uploaded_file.getvalue())
        logger.info(f"Uploaded file: {filename}")

st.sidebar.subheader("Image Gallery ๐Ÿ–ผ๏ธ")
image_files = get_gallery_files(["png", "jpeg"])
if image_files:
    cols = st.sidebar.columns(2)
    for idx, file in enumerate(image_files[:gallery_size * 2]):
        with cols[idx % 2]:
            st.image(Image.open(file), caption=file, use_container_width=True)
            st.markdown(get_download_link(file, "image/png" if file.endswith(".png") else "image/jpeg", f"Save Pic ๐Ÿ–ผ๏ธ"), unsafe_allow_html=True)

st.sidebar.subheader("Model Management ๐Ÿ—‚๏ธ")
model_dirs = get_model_files()
selected_model = st.sidebar.selectbox("Select Saved Model", ["None"] + model_dirs)
model_type = st.sidebar.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"])
if selected_model != "None" and st.sidebar.button("Load Model ๐Ÿ“‚"):
    builder = {
        "Micro Diffusion": MicroDiffusionBuilder,
        "Latent Diffusion": LatentDiffusionBuilder,
        "FLUX.1 Distilled": FluxDiffusionBuilder
    }[model_type]()
    config = DiffusionConfig(name=os.path.basename(selected_model), base_model="unknown", size="small")
    try:
        builder.load_model(selected_model, config)
        st.session_state['cv_builder'] = builder
        st.session_state['cv_loaded'] = True
        st.rerun()
    except Exception as e:
        st.error(f"Model load failed: {str(e)} ๐Ÿ’ฅ (Check logs for details!)")

st.sidebar.subheader("Model Status ๐Ÿšฆ")
st.sidebar.write(f"**CV Model**: {'Loaded' if st.session_state['cv_loaded'] else 'Not Loaded'} {'(Active)' if st.session_state['cv_loaded'] and isinstance(st.session_state.get('cv_builder'), (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)) else ''}")

tabs = ["Build Titan ๐ŸŒฑ", "Camera Snap ๐Ÿ“ท", "Fine-Tune Titan (CV) ๐Ÿ”ง", "Test Titan (CV) ๐Ÿงช", "Agentic RAG Party (CV) ๐ŸŒ"]
tab1, tab2, tab3, tab4, tab5 = st.tabs(tabs)

for i, tab in enumerate(tabs):
    if st.session_state['active_tab'] != tab and st.session_state.get(f'tab{i}_active', False):
        logger.info(f"Switched to tab: {tab}")
        st.session_state['active_tab'] = tab
    st.session_state[f'tab{i}_active'] = (st.session_state['active_tab'] == tab)

with tab1:
    st.header("Build Titan ๐ŸŒฑ")
    model_type = st.selectbox("Diffusion Type", ["Micro Diffusion", "Latent Diffusion", "FLUX.1 Distilled"], key="build_type")
    base_model = st.selectbox("Select Tiny Model", 
        ["CompVis/ldm-text2im-large-256" if model_type == "Micro Diffusion" else "runwayml/stable-diffusion-v1-5" if model_type == "Latent Diffusion" else "black-forest-labs/flux.1-distilled"])
    model_name = st.text_input("Model Name", f"tiny-titan-{int(time.time())}")
    if st.button("Download Model โฌ‡๏ธ"):
        config = DiffusionConfig(name=model_name, base_model=base_model, size="small")
        builder = {
            "Micro Diffusion": MicroDiffusionBuilder,
            "Latent Diffusion": LatentDiffusionBuilder,
            "FLUX.1 Distilled": FluxDiffusionBuilder
        }[model_type]()
        try:
            builder.load_model(base_model, config)
            builder.save_model(config.model_path)
            st.session_state['cv_builder'] = builder
            st.session_state['cv_loaded'] = True
            st.rerun()
        except Exception as e:
            st.error(f"Model build failed: {str(e)} ๐Ÿ’ฅ (Check logs for details!)")

with tab2:
    st.header("Camera Snap ๐Ÿ“ท (Dual Capture Fiesta!)")
    video_devices = get_available_video_devices()
    st.write(f"๐ŸŽ‰ Detected Cameras: {', '.join(video_devices)}")
    st.info("Switch cams in your browser settings (e.g., Chrome > Privacy > Camera) since Iโ€™m a browser star! ๐ŸŒŸ")

    st.subheader("Camera 0 ๐ŸŽฌ - Lights, Camera, Action!")
    cam0_cols = st.columns(4)
    with cam0_cols[0]:
        cam0_device = st.selectbox("Cam ๐Ÿ“ท", video_devices, index=0, key="cam0_device", help="Pick your star cam! ๐ŸŒŸ")
    with cam0_cols[1]:
        cam0_label = st.text_input("Tag ๐Ÿท๏ธ", "Cam 0 Snap", key="cam0_label", help="Name your masterpiece! ๐ŸŽจ")
    with cam0_cols[2]:
        cam0_help = st.text_input("Hint ๐Ÿ’ก", "Snap a heroic moment! ๐Ÿฆธโ€โ™‚๏ธ", key="cam0_help", help="Give a fun tip!")
    with cam0_cols[3]:
        cam0_vis = st.selectbox("Show ๐Ÿ–ผ๏ธ", ["visible", "hidden", "collapsed"], index=0, key="cam0_vis", help="Label vibes: Visible, Sneaky, or Gone!")

    st.subheader("Camera 1 ๐ŸŽฅ - Roll the Film!")
    cam1_cols = st.columns(4)
    with cam1_cols[0]:
        cam1_device = st.selectbox("Cam ๐Ÿ“ท", video_devices, index=1 if len(video_devices) > 1 else 0, key="cam1_device", help="Choose your blockbuster cam! ๐ŸŽฌ")
    with cam1_cols[1]:
        cam1_label = st.text_input("Tag ๐Ÿท๏ธ", "Cam 1 Snap", key="cam1_label", help="Title your epic shot! ๐ŸŒ ")
    with cam1_cols[2]:
        cam1_help = st.text_input("Hint ๐Ÿ’ก", "Grab an epic frame! ๐ŸŒŸ", key="cam1_help", help="Drop a cheeky hint!")
    with cam1_cols[3]:
        cam1_vis = st.selectbox("Show ๐Ÿ–ผ๏ธ", ["visible", "hidden", "collapsed"], index=0, key="cam1_vis", help="Label style: Show it, Hide it, Poof!")

    cols = st.columns(2)
    with cols[0]:
        st.subheader(f"Camera 0 ({cam0_device}) ๐ŸŽฌ")
        cam0_img = st.camera_input(
            label=cam0_label,
            key="cam0",
            help=cam0_help,
            disabled=False,
            label_visibility=cam0_vis
        )
        if cam0_img:
            filename = generate_filename("cam0")
            with open(filename, "wb") as f:
                f.write(cam0_img.getvalue())
            st.image(Image.open(filename), caption=filename, use_container_width=True)
            logger.info(f"Saved snapshot from Camera 0: {filename}")
            st.session_state['captured_images'].append(filename)
            update_gallery()
        st.info("๐Ÿšจ One snap at a timeโ€”your Titanโ€™s too cool for bursts! ๐Ÿ˜Ž")
    with cols[1]:
        st.subheader(f"Camera 1 ({cam1_device}) ๐ŸŽฅ")
        cam1_img = st.camera_input(
            label=cam1_label,
            key="cam1",
            help=cam1_help,
            disabled=False,
            label_visibility=cam1_vis
        )
        if cam1_img:
            filename = generate_filename("cam1")
            with open(filename, "wb") as f:
                f.write(cam1_img.getvalue())
            st.image(Image.open(filename), caption=filename, use_container_width=True)
            logger.info(f"Saved snapshot from Camera 1: {filename}")
            st.session_state['captured_images'].append(filename)
            update_gallery()
        st.info("๐Ÿšจ Single shots onlyโ€”craft your masterpiece! ๐ŸŽจ")

with tab3:
    st.header("Fine-Tune Titan (CV) ๐Ÿ”ง (Sculpt Your Pixel Prodigy!)")
    if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)):
        st.warning("Please build or load a CV Titan first! โš ๏ธ (No artist, no canvas!)")
    else:
        captured_images = get_gallery_files(["png"])
        if len(captured_images) >= 2:
            st.subheader("Use Case 1: Denoise Snapshots ๐ŸŒŸ")
            denoising_data = [{"image": img, "text": f"Denoised {os.path.basename(img).split('-')[4]} snap"} for img in captured_images[:min(len(captured_images), 10)]]
            denoising_edited = st.data_editor(pd.DataFrame(denoising_data), num_rows="dynamic", help="Craft denoising pairs! ๐ŸŒŸ")
            if st.button("Fine-Tune Denoising ๐Ÿ”„"):
                images = [Image.open(row["image"]) for _, row in denoising_edited.iterrows()]
                texts = [row["text"] for _, row in denoising_edited.iterrows()]
                new_model_name = f"{st.session_state['cv_builder'].config.name}-denoise-{int(time.time())}"
                new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small")
                st.session_state['cv_builder'].config = new_config
                with st.status("Fine-tuning for denoising... โณ (Polishing pixels!)", expanded=True) as status:
                    st.session_state['cv_builder'].fine_tune_sft(images, texts)
                    st.session_state['cv_builder'].save_model(new_config.model_path)
                    status.update(label="Denoising tuned! ๐ŸŽ‰ (Pixel shine unleashed!)", state="complete")
                zip_path = f"{new_config.model_path}.zip"
                zip_files([new_config.model_path], zip_path)
                st.markdown(get_download_link(zip_path, "application/zip", "Download Denoised Titan ๐Ÿ“ฆ"), unsafe_allow_html=True)
            denoising_csv = f"denoise_dataset_{int(time.time())}.csv"
            with open(denoising_csv, "w", newline="") as f:
                writer = csv.writer(f)
                writer.writerow(["image", "text"])
                for _, row in denoising_edited.iterrows():
                    writer.writerow([row["image"], row["text"]])
            st.markdown(get_download_link(denoising_csv, "text/csv", "Download Denoising CSV ๐Ÿ“œ"), unsafe_allow_html=True)

            st.subheader("Use Case 2: Stylize Snapshots ๐ŸŽจ")
            stylize_data = [{"image": img, "text": f"Neon {os.path.basename(img).split('-')[4]} style"} for img in captured_images[:min(len(captured_images), 10)]]
            stylize_edited = st.data_editor(pd.DataFrame(stylize_data), num_rows="dynamic", help="Craft stylized pairs! ๐ŸŽจ")
            if st.button("Fine-Tune Stylization ๐Ÿ”„"):
                images = [Image.open(row["image"]) for _, row in stylize_edited.iterrows()]
                texts = [row["text"] for _, row in stylize_edited.iterrows()]
                new_model_name = f"{st.session_state['cv_builder'].config.name}-stylize-{int(time.time())}"
                new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small")
                st.session_state['cv_builder'].config = new_config
                with st.status("Fine-tuning for stylization... โณ (Painting pixels!)", expanded=True) as status:
                    st.session_state['cv_builder'].fine_tune_sft(images, texts)
                    st.session_state['cv_builder'].save_model(new_config.model_path)
                    status.update(label="Stylization tuned! ๐ŸŽ‰ (Pixel art unleashed!)", state="complete")
                zip_path = f"{new_config.model_path}.zip"
                zip_files([new_config.model_path], zip_path)
                st.markdown(get_download_link(zip_path, "application/zip", "Download Stylized Titan ๐Ÿ“ฆ"), unsafe_allow_html=True)
            stylize_md = f"stylize_dataset_{int(time.time())}.md"
            with open(stylize_md, "w") as f:
                f.write("# Stylization Dataset\n\n")
                for _, row in stylize_edited.iterrows():
                    f.write(f"- `{row['image']}`: {row['text']}\n")
            st.markdown(get_download_link(stylize_md, "text/markdown", "Download Stylization MD ๐Ÿ“"), unsafe_allow_html=True)

            st.subheader("Use Case 3: Multi-Angle Snapshots ๐ŸŒ")
            multiangle_data = [{"image": img, "text": f"View from {os.path.basename(img).split('-')[4]}"} for img in captured_images[:min(len(captured_images), 10)]]
            multiangle_edited = st.data_editor(pd.DataFrame(multiangle_data), num_rows="dynamic", help="Craft multi-angle pairs! ๐ŸŒ")
            if st.button("Fine-Tune Multi-Angle ๐Ÿ”„"):
                images = [Image.open(row["image"]) for _, row in multiangle_edited.iterrows()]
                texts = [row["text"] for _, row in multiangle_edited.iterrows()]
                new_model_name = f"{st.session_state['cv_builder'].config.name}-multiangle-{int(time.time())}"
                new_config = DiffusionConfig(name=new_model_name, base_model=st.session_state['cv_builder'].config.base_model, size="small")
                st.session_state['cv_builder'].config = new_config
                with st.status("Fine-tuning for multi-angle... โณ (Spinning pixels!)", expanded=True) as status:
                    st.session_state['cv_builder'].fine_tune_sft(images, texts)
                    st.session_state['cv_builder'].save_model(new_config.model_path)
                    status.update(label="Multi-angle tuned! ๐ŸŽ‰ (Pixel views unleashed!)", state="complete")
                zip_path = f"{new_config.model_path}.zip"
                zip_files([new_config.model_path], zip_path)
                st.markdown(get_download_link(zip_path, "application/zip", "Download Multi-Angle Titan ๐Ÿ“ฆ"), unsafe_allow_html=True)
            multiangle_csv = f"multiangle_dataset_{int(time.time())}.csv"
            with open(multiangle_csv, "w", newline="") as f:
                writer = csv.writer(f)
                writer.writerow(["image", "text"])
                for _, row in multiangle_edited.iterrows():
                    writer.writerow([row["image"], row["text"]])
            st.markdown(get_download_link(multiangle_csv, "text/csv", "Download Multi-Angle CSV ๐Ÿ“œ"), unsafe_allow_html=True)

with tab4:
    st.header("Test Titan (CV) ๐Ÿงช (Unleash Your Pixel Power!)")
    if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)):
        st.warning("Please build or load a CV Titan first! โš ๏ธ (No artist, no masterpiece!)")
    else:
        st.subheader("Test Your Titan ๐ŸŽจ")
        test_prompt = st.text_area("Prompt ๐ŸŽค", "Neon glow from cam0", help="Dream up a wild imageโ€”your Titanโ€™s ready to paint! ๐Ÿ–Œ๏ธ")
        if st.button("Generate โ–ถ๏ธ"):
            with st.spinner("Crafting your masterpiece... โณ (Titanโ€™s mixing pixels!)"):
                image = st.session_state['cv_builder'].generate(test_prompt)
                st.image(image, caption=f"Generated: {test_prompt}", use_container_width=True)

with tab5:
    st.header("Agentic RAG Party (CV) ๐ŸŒ (Pixel Party Extravaganza!)")
    st.write("Generate superhero party vibes from your tuned Titan! ๐ŸŽ‰")
    if not st.session_state['cv_loaded'] or not isinstance(st.session_state['cv_builder'], (MicroDiffusionBuilder, LatentDiffusionBuilder, FluxDiffusionBuilder)):
        st.warning("Please build or load a CV Titan first! โš ๏ธ (No artist, no party!)")
    else:
        if st.button("Run RAG Demo ๐ŸŽ‰"):
            with st.spinner("Loading your pixel party titan... โณ (Titanโ€™s grabbing its brush!)"):
                class CVPartyAgent:
                    def __init__(self, pipeline):
                        self.pipeline = pipeline
                    def generate(self, prompt: str) -> Image.Image:
                        return self.pipeline(prompt, num_inference_steps=50).images[0]
                    def plan_party(self):
                        prompts = [
                            "Gold-plated Batman statue from cam0",
                            "VR superhero battle scene from cam1",
                            "Neon-lit Avengers tower from cam2"
                        ]
                        data = [{"Theme": f"Scene {i+1}", "Image Idea": prompt} for i, prompt in enumerate(prompts)]
                        return pd.DataFrame(data)
                agent = CVPartyAgent(st.session_state['cv_builder'].pipeline)
                st.write("Party agent ready! ๐ŸŽจ (Time to paint an epic bash!)")
            with st.spinner("Crafting superhero party visuals... โณ (Pixels assemble!)"):
                try:
                    plan_df = agent.plan_party()
                    st.dataframe(plan_df)
                    for _, row in plan_df.iterrows():
                        image = agent.generate(row["Image Idea"])
                        st.image(image, caption=f"{row['Theme']} - {row['Image Idea']}", use_container_width=True)
                except Exception as e:
                    st.error(f"Party crashed: {str(e)} ๐Ÿ’ฅ (Pixel oopsie!)")
                    logger.error(f"RAG demo failed: {str(e)}")

st.sidebar.subheader("Action Logs ๐Ÿ“œ")
log_container = st.sidebar.empty()
with log_container:
    for record in log_records:
        st.write(f"{record.asctime} - {record.levelname} - {record.message}")

update_gallery()