Spaces:
Sleeping
Sleeping
import torch | |
import os | |
import requests | |
import logging | |
from pathlib import Path | |
from transformers import CLIPModel, CLIPProcessor, AutoTokenizer, MarianMTModel, MarianTokenizer | |
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler | |
import gradio as gr | |
from typing import List, Tuple, Optional, Any | |
from dataclasses import dataclass | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger(__name__) | |
def download_model(model_url: str, model_path: str): | |
"""Download large model file with progress tracking.""" | |
if not os.path.exists(model_path): | |
try: | |
logger.info(f"Downloading model from {model_url}...") | |
response = requests.get(model_url, stream=True) | |
response.raise_for_status() | |
total_size = int(response.headers.get('content-length', 0)) | |
block_size = 1024 * 1024 # 1 MB chunks | |
downloaded_size = 0 | |
with open(model_path, 'wb') as f: | |
for data in response.iter_content(block_size): | |
f.write(data) | |
downloaded_size += len(data) | |
progress = (downloaded_size / total_size) * 100 if total_size > 0 else 0 | |
logger.info(f"Download progress: {progress:.2f}%") | |
logger.info("Model download complete.") | |
except Exception as e: | |
logger.error(f"Model download failed: {e}") | |
raise | |
class GenerationConfig: | |
num_images: int = 1 | |
num_inference_steps: int = 50 | |
guidance_scale: float = 7.5 | |
seed: Optional[int] = None | |
class ModelCache: | |
def __init__(self, cache_dir: Path): | |
self.cache_dir = cache_dir | |
self.cache_dir.mkdir(parents=True, exist_ok=True) | |
def load_model(self, model_id: str, load_func: callable, cache_name: str) -> Any: | |
try: | |
logger.info(f"Loading {cache_name}") | |
return load_func(model_id) | |
except Exception as e: | |
logger.error(f"Error loading model {cache_name}: {str(e)}") | |
raise | |
class EnhancedBanglaSDGenerator: | |
def __init__( | |
self, | |
cache_dir: str, | |
device: Optional[torch.device] = None | |
): | |
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
logger.info(f"Using device: {self.device}") | |
self.cache = ModelCache(Path(cache_dir)) | |
self._initialize_models() | |
self._load_context_data() | |
def _load_banglaclip_model(self): | |
"""Load BanglaCLIP model from Hugging Face directly""" | |
try: | |
model = CLIPModel.from_pretrained("Mansuba/BanglaCLIP13") | |
return model.to(self.device) | |
except Exception as e: | |
logger.error(f"Failed to load BanglaCLIP model: {str(e)}") | |
raise | |
def _initialize_models(self): | |
try: | |
# Translation models | |
self.bn2en_model_name = "Helsinki-NLP/opus-mt-bn-en" | |
self.translator = self.cache.load_model( | |
self.bn2en_model_name, | |
MarianMTModel.from_pretrained, | |
"translator" | |
).to(self.device) | |
self.trans_tokenizer = MarianTokenizer.from_pretrained(self.bn2en_model_name) | |
# CLIP models | |
self.clip_model_name = "openai/clip-vit-base-patch32" | |
self.bangla_text_model = "csebuetnlp/banglabert" | |
# Load BanglaCLIP model directly from Hugging Face | |
self.banglaclip_model = self._load_banglaclip_model() | |
self.processor = CLIPProcessor.from_pretrained(self.clip_model_name) | |
self.tokenizer = AutoTokenizer.from_pretrained(self.bangla_text_model) | |
# Stable Diffusion | |
self._initialize_stable_diffusion() | |
except Exception as e: | |
logger.error(f"Error initializing models: {str(e)}") | |
raise RuntimeError(f"Failed to initialize models: {str(e)}") | |
def _initialize_stable_diffusion(self): | |
"""Load and initialize Stable Diffusion pipeline""" | |
pass # Your existing code for initializing Stable Diffusion | |
def create_gradio_interface(): | |
"""Create and configure the Gradio interface.""" | |
cache_dir = Path("model_cache") | |
generator = None | |
def initialize_generator(): | |
nonlocal generator | |
if generator is None: | |
generator = EnhancedBanglaSDGenerator( | |
cache_dir=str(cache_dir) | |
) | |
return generator | |
def cleanup_generator(): | |
nonlocal generator | |
if generator is not None: | |
generator.cleanup() | |
generator = None | |
def generate_images(text: str, num_images: int, steps: int, guidance_scale: float, seed: Optional[int]) -> Tuple[List[Any], str]: | |
if not text.strip(): | |
return None, "দয়া করে কিছু টেক্সট লিখুন" | |
try: | |
gen = initialize_generator() | |
config = GenerationConfig( | |
num_images=int(num_images), | |
num_inference_steps=int(steps), | |
guidance_scale=float(guidance_scale), | |
seed=int(seed) if seed else None | |
) | |
images, prompt = gen.generate_image(text, config) | |
cleanup_generator() | |
return images, prompt | |
except Exception as e: | |
logger.error(f"Error in Gradio interface: {str(e)}") | |
cleanup_generator() | |
return None, f"ছবি তৈরি ব্যর্থ হয়েছে: {str(e)}" | |
with gr.Blocks() as demo: | |
text_input = gr.Textbox(label="Text", placeholder="Enter your prompt here...") | |
num_images_input = gr.Slider(minimum=1, maximum=5, value=1, label="Number of Images") | |
steps_input = gr.Slider(minimum=1, maximum=100, value=50, label="Steps") | |
guidance_scale_input = gr.Slider(minimum=1, maximum=20, value=7.5, label="Guidance Scale") | |
seed_input = gr.Number(label="Seed", optional=True) | |
output_images = gr.Gallery(label="Generated Images") | |
generate_button = gr.Button("Generate Images") | |
generate_button.click(generate_images, inputs=[text_input, num_images_input, steps_input, guidance_scale_input, seed_input], outputs=[output_images]) | |
return demo | |
if __name__ == "__main__": | |
demo = create_gradio_interface() | |
demo.queue().launch(share=True, debug=True) | |