File size: 6,523 Bytes
6b4f4ab 04a4097 6b4f4ab 184cc4e 6b4f4ab 04a4097 184cc4e 04a4097 6b4f4ab 14d0307 6b4f4ab 14d0307 6b4f4ab 04a4097 6b4f4ab 04a4097 6b4f4ab 14d0307 6b4f4ab 04a4097 6b4f4ab 184cc4e 6b4f4ab 184cc4e 6b4f4ab 184cc4e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import torch
import os
import requests
import logging
from pathlib import Path
from transformers import CLIPModel, CLIPProcessor, AutoTokenizer, MarianMTModel, MarianTokenizer
from diffusers import StableDiffusionPipeline, DPMSolverMultistepScheduler
import gradio as gr
from typing import List, Tuple, Optional, Any
from dataclasses import dataclass
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def download_model(model_url: str, model_path: str):
"""Download large model file with progress tracking."""
if not os.path.exists(model_path):
try:
logger.info(f"Downloading model from {model_url}...")
response = requests.get(model_url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
block_size = 1024 * 1024 # 1 MB chunks
downloaded_size = 0
with open(model_path, 'wb') as f:
for data in response.iter_content(block_size):
f.write(data)
downloaded_size += len(data)
progress = (downloaded_size / total_size) * 100 if total_size > 0 else 0
logger.info(f"Download progress: {progress:.2f}%")
logger.info("Model download complete.")
except Exception as e:
logger.error(f"Model download failed: {e}")
raise
@dataclass
class GenerationConfig:
num_images: int = 1
num_inference_steps: int = 50
guidance_scale: float = 7.5
seed: Optional[int] = None
class ModelCache:
def __init__(self, cache_dir: Path):
self.cache_dir = cache_dir
self.cache_dir.mkdir(parents=True, exist_ok=True)
def load_model(self, model_id: str, load_func: callable, cache_name: str) -> Any:
try:
logger.info(f"Loading {cache_name}")
return load_func(model_id)
except Exception as e:
logger.error(f"Error loading model {cache_name}: {str(e)}")
raise
class EnhancedBanglaSDGenerator:
def __init__(
self,
cache_dir: str,
device: Optional[torch.device] = None
):
self.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {self.device}")
self.cache = ModelCache(Path(cache_dir))
self._initialize_models()
self._load_context_data()
def _load_banglaclip_model(self):
"""Load BanglaCLIP model from Hugging Face directly"""
try:
model = CLIPModel.from_pretrained("Mansuba/BanglaCLIP13")
return model.to(self.device)
except Exception as e:
logger.error(f"Failed to load BanglaCLIP model: {str(e)}")
raise
def _initialize_models(self):
try:
# Translation models
self.bn2en_model_name = "Helsinki-NLP/opus-mt-bn-en"
self.translator = self.cache.load_model(
self.bn2en_model_name,
MarianMTModel.from_pretrained,
"translator"
).to(self.device)
self.trans_tokenizer = MarianTokenizer.from_pretrained(self.bn2en_model_name)
# CLIP models
self.clip_model_name = "openai/clip-vit-base-patch32"
self.bangla_text_model = "csebuetnlp/banglabert"
# Load BanglaCLIP model directly from Hugging Face
self.banglaclip_model = self._load_banglaclip_model()
self.processor = CLIPProcessor.from_pretrained(self.clip_model_name)
self.tokenizer = AutoTokenizer.from_pretrained(self.bangla_text_model)
# Stable Diffusion
self._initialize_stable_diffusion()
except Exception as e:
logger.error(f"Error initializing models: {str(e)}")
raise RuntimeError(f"Failed to initialize models: {str(e)}")
def _initialize_stable_diffusion(self):
"""Load and initialize Stable Diffusion pipeline"""
pass # Your existing code for initializing Stable Diffusion
def create_gradio_interface():
"""Create and configure the Gradio interface."""
cache_dir = Path("model_cache")
generator = None
def initialize_generator():
nonlocal generator
if generator is None:
generator = EnhancedBanglaSDGenerator(
cache_dir=str(cache_dir)
)
return generator
def cleanup_generator():
nonlocal generator
if generator is not None:
generator.cleanup()
generator = None
def generate_images(text: str, num_images: int, steps: int, guidance_scale: float, seed: Optional[int]) -> Tuple[List[Any], str]:
if not text.strip():
return None, "দয়া করে কিছু টেক্সট লিখুন"
try:
gen = initialize_generator()
config = GenerationConfig(
num_images=int(num_images),
num_inference_steps=int(steps),
guidance_scale=float(guidance_scale),
seed=int(seed) if seed else None
)
images, prompt = gen.generate_image(text, config)
cleanup_generator()
return images, prompt
except Exception as e:
logger.error(f"Error in Gradio interface: {str(e)}")
cleanup_generator()
return None, f"ছবি তৈরি ব্যর্থ হয়েছে: {str(e)}"
with gr.Blocks() as demo:
text_input = gr.Textbox(label="Text", placeholder="Enter your prompt here...")
num_images_input = gr.Slider(minimum=1, maximum=5, value=1, label="Number of Images")
steps_input = gr.Slider(minimum=1, maximum=100, value=50, label="Steps")
guidance_scale_input = gr.Slider(minimum=1, maximum=20, value=7.5, label="Guidance Scale")
seed_input = gr.Number(label="Seed", optional=True)
output_images = gr.Gallery(label="Generated Images")
generate_button = gr.Button("Generate Images")
generate_button.click(generate_images, inputs=[text_input, num_images_input, steps_input, guidance_scale_input, seed_input], outputs=[output_images])
return demo
if __name__ == "__main__":
demo = create_gradio_interface()
demo.queue().launch(share=True, debug=True)
|