Spaces:
Running
Running
import os | |
import redis | |
import pickle | |
import torch | |
from PIL import Image | |
from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, FluxPipeline, DiffusionPipeline, DPMSolverMultistepScheduler | |
from diffusers.utils import export_to_video | |
from transformers import pipeline as transformers_pipeline, AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer | |
from audiocraft.models import MusicGen | |
import gradio as gr | |
from huggingface_hub import snapshot_download, HfApi, HfFolder | |
import multiprocessing | |
import io | |
import time | |
# Obtener las variables de entorno | |
hf_token = os.getenv("HF_TOKEN") | |
redis_host = os.getenv("REDIS_HOST") | |
redis_port = int(os.getenv("REDIS_PORT", 6379)) # Valor predeterminado si no se proporciona | |
redis_password = os.getenv("REDIS_PASSWORD") | |
HfFolder.save_token(hf_token) | |
def connect_to_redis(): | |
while True: | |
try: | |
redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password) | |
redis_client.ping() # Verifica si la conexión está activa | |
print("Connected to Redis successfully.") | |
return redis_client | |
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError, BrokenPipeError) as e: | |
print(f"Connection to Redis failed: {e}. Retrying in 1 second...") | |
time.sleep(1) | |
def reconnect_if_needed(redis_client): | |
try: | |
redis_client.ping() | |
except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError, BrokenPipeError): | |
print("Reconnecting to Redis...") | |
return connect_to_redis() | |
return redis_client | |
def load_object_from_redis(key): | |
redis_client = connect_to_redis() | |
redis_client = reconnect_if_needed(redis_client) | |
try: | |
obj_data = redis_client.get(key) | |
return pickle.loads(obj_data) if obj_data else None | |
except (pickle.PickleError, redis.exceptions.RedisError) as e: | |
print(f"Failed to load object from Redis: {e}") | |
return None | |
def save_object_to_redis(key, obj): | |
redis_client = connect_to_redis() | |
redis_client = reconnect_if_needed(redis_client) | |
try: | |
redis_client.set(key, pickle.dumps(obj)) | |
print(f"Object saved to Redis: {key}") | |
except redis.exceptions.RedisError as e: | |
print(f"Failed to save object to Redis: {e}") | |
def get_model_or_download(model_id, redis_key, loader_func): | |
model = load_object_from_redis(redis_key) | |
if model: | |
print(f"Model loaded from Redis: {redis_key}") | |
return model | |
try: | |
model = loader_func(model_id, torch_dtype=torch.float16) | |
save_object_to_redis(redis_key, model) | |
print(f"Model downloaded and saved to Redis: {redis_key}") | |
except Exception as e: | |
print(f"Failed to load or save model: {e}") | |
return None | |
def generate_image(prompt): | |
redis_key = f"generated_image_{prompt}" | |
image = load_object_from_redis(redis_key) | |
if not image: | |
try: | |
image = text_to_image_pipeline(prompt).images[0] | |
save_object_to_redis(redis_key, image) | |
except Exception as e: | |
print(f"Failed to generate image: {e}") | |
return None | |
return image | |
def edit_image_with_prompt(image, prompt, strength=0.75): | |
redis_key = f"edited_image_{prompt}_{strength}" | |
edited_image = load_object_from_redis(redis_key) | |
if not edited_image: | |
try: | |
edited_image = img2img_pipeline(prompt=prompt, init_image=image.convert("RGB"), strength=strength).images[0] | |
save_object_to_redis(redis_key, edited_image) | |
except Exception as e: | |
print(f"Failed to edit image: {e}") | |
return None | |
return edited_image | |
def generate_song(prompt, duration=10): | |
redis_key = f"generated_song_{prompt}_{duration}" | |
song = load_object_from_redis(redis_key) | |
if not song: | |
try: | |
song = music_gen.generate(prompt, duration=duration) | |
save_object_to_redis(redis_key, song) | |
except Exception as e: | |
print(f"Failed to generate song: {e}") | |
return None | |
return song | |
def generate_text(prompt): | |
redis_key = f"generated_text_{prompt}" | |
text = load_object_from_redis(redis_key) | |
if not text: | |
try: | |
text = text_gen_pipeline([{"role": "user", "content": prompt}], max_new_tokens=256)[0]["generated_text"].strip() | |
save_object_to_redis(redis_key, text) | |
except Exception as e: | |
print(f"Failed to generate text: {e}") | |
return None | |
return text | |
def generate_flux_image(prompt): | |
redis_key = f"generated_flux_image_{prompt}" | |
flux_image = load_object_from_redis(redis_key) | |
if not flux_image: | |
try: | |
flux_image = flux_pipeline( | |
prompt, | |
guidance_scale=0.0, | |
num_inference_steps=4, | |
max_sequence_length=256, | |
generator=torch.Generator("cpu").manual_seed(0) | |
).images[0] | |
save_object_to_redis(redis_key, flux_image) | |
except Exception as e: | |
print(f"Failed to generate flux image: {e}") | |
return None | |
return flux_image | |
def generate_code(prompt): | |
redis_key = f"generated_code_{prompt}" | |
code = load_object_from_redis(redis_key) | |
if not code: | |
try: | |
inputs = starcoder_tokenizer.encode(prompt, return_tensors="pt").to("cuda") | |
outputs = starcoder_model.generate(inputs) | |
code = starcoder_tokenizer.decode(outputs[0]) | |
save_object_to_redis(redis_key, code) | |
except Exception as e: | |
print(f"Failed to generate code: {e}") | |
return None | |
return code | |
def generate_video(prompt): | |
redis_key = f"generated_video_{prompt}" | |
video = load_object_from_redis(redis_key) | |
if not video: | |
try: | |
pipe = DiffusionPipeline.from_pretrained("damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16) | |
pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) | |
pipe.enable_model_cpu_offload() | |
video = export_to_video(pipe(prompt, num_inference_steps=25).frames) | |
save_object_to_redis(redis_key, video) | |
except Exception as e: | |
print(f"Failed to generate video: {e}") | |
return None | |
return video | |
def test_model_meta_llama(): | |
redis_key = "meta_llama_test_response" | |
response = load_object_from_redis(redis_key) | |
if not response: | |
try: | |
messages = [ | |
{"role": "system", "content": "You are a pirate chatbot who always responds in pirate speak!"}, | |
{"role": "user", "content": "Who are you?"} | |
] | |
response = meta_llama_pipeline(messages, max_new_tokens=256)[0]["generated_text"].strip() | |
save_object_to_redis(redis_key, response) | |
except Exception as e: | |
print(f"Failed to test Meta-Llama: {e}") | |
return None | |
return response | |
def train_model(model, dataset, epochs, batch_size, learning_rate): | |
output_dir = io.BytesIO() | |
training_args = TrainingArguments( | |
output_dir=output_dir, | |
num_train_epochs=epochs, | |
per_device_train_batch_size=batch_size, | |
learning_rate=learning_rate, | |
) | |
trainer = Trainer(model=model, args=training_args, train_dataset=dataset) | |
try: | |
trainer.train() | |
save_object_to_redis("trained_model", model) | |
save_object_to_redis("training_results", output_dir.getvalue()) | |
except Exception as e: | |
print(f"Failed to train model: {e}") | |
def run_task(task_queue): | |
while True: | |
task = task_queue.get() | |
if task is None: | |
break | |
func, args, kwargs = task | |
try: | |
func(*args, **kwargs) | |
except Exception as e: | |
print(f"Failed to run task: {e}") | |
task_queue = multiprocessing.Queue() | |
num_processes = multiprocessing.cpu_count() | |
processes = [] | |
for _ in range(num_processes): | |
p = multiprocessing.Process(target=run_task, args=(task_queue,)) | |
p.start() | |
processes.append(p) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Cargar modelos | |
text_to_image_pipeline = get_model_or_download("CompVis/stable-diffusion-v1-4", "text_to_image_model", StableDiffusionPipeline.from_pretrained) | |
img2img_pipeline = get_model_or_download("CompVis/stable-diffusion-v1-4", "img2img_model", StableDiffusionImg2ImgPipeline.from_pretrained) | |
flux_pipeline = get_model_or_download("black-forest-labs/FLUX.1-schnell", "flux_model", FluxPipeline.from_pretrained) | |
text_gen_pipeline = transformers_pipeline("text-generation", model="bigcode/starcoder", tokenizer="bigcode/starcoder", device=0) | |
music_gen = load_object_from_redis("music_gen") or MusicGen.from_pretrained('melody') | |
meta_llama_pipeline = get_model_or_download("meta-llama/Meta-Llama-3.1-8B-Instruct", "meta_llama_model", transformers_pipeline) | |
# Definir interfaces de usuario | |
gen_image_tab = gr.Interface(generate_image, gr.inputs.Textbox(label="Prompt:"), gr.outputs.Image(type="pil"), title="Generate Image") | |
edit_image_tab = gr.Interface(edit_image_with_prompt, [gr.inputs.Image(type="pil", label="Image:"), gr.inputs.Textbox(label="Prompt:"), gr.inputs.Slider(0.1, 1.0, 0.75, step=0.05, label="Strength:")], gr.outputs.Image(type="pil"), title="Edit Image") | |
generate_song_tab = gr.Interface(generate_song, [gr.inputs.Textbox(label="Prompt:"), gr.inputs.Slider(5, 60, 10, step=1, label="Duration (s):")], gr.outputs.Audio(type="numpy"), title="Generate Songs") | |
generate_text_tab = gr.Interface(generate_text, gr.inputs.Textbox(label="Prompt:"), gr.outputs.Textbox(label="Generated Text:"), title="Generate Text") | |
generate_flux_image_tab = gr.Interface(generate_flux_image, gr.inputs.Textbox(label="Prompt:"), gr.outputs.Image(type="pil"), title="Generate FLUX Images") | |
model_meta_llama_test_tab = gr.Interface(test_model_meta_llama, gr.inputs.Textbox(label="Test Input:"), gr.outputs.Textbox(label="Model Output:"), title="Test Meta-Llama") | |
app = gr.TabbedInterface( | |
[gen_image_tab, edit_image_tab, generate_song_tab, generate_text_tab, generate_flux_image_tab, model_meta_llama_test_tab], | |
["Generate Image", "Edit Image", "Generate Song", "Generate Text", "Generate FLUX Image", "Test Meta-Llama"] | |
) | |
app.launch(share=True) | |
for _ in range(num_processes): | |
task_queue.put(None) | |
for p in processes: | |
p.join() | |