import os import redis import pickle import torch from PIL import Image from diffusers import StableDiffusionPipeline, StableDiffusionImg2ImgPipeline, FluxPipeline, DiffusionPipeline, DPMSolverMultistepScheduler from diffusers.utils import export_to_video from transformers import pipeline as transformers_pipeline, AutoModelForCausalLM, AutoTokenizer, TrainingArguments, Trainer from audiocraft.models import MusicGen import gradio as gr from huggingface_hub import snapshot_download, HfApi, HfFolder import multiprocessing import io import time # Obtener las variables de entorno hf_token = os.getenv("HF_TOKEN") redis_host = os.getenv("REDIS_HOST") redis_port = int(os.getenv("REDIS_PORT", 6379)) # Valor predeterminado si no se proporciona redis_password = os.getenv("REDIS_PASSWORD") HfFolder.save_token(hf_token) def connect_to_redis(): while True: try: redis_client = redis.Redis(host=redis_host, port=redis_port, password=redis_password) redis_client.ping() # Verifica si la conexión está activa print("Connected to Redis successfully.") return redis_client except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError, BrokenPipeError) as e: print(f"Connection to Redis failed: {e}. Retrying in 1 second...") time.sleep(1) def reconnect_if_needed(redis_client): try: redis_client.ping() except (redis.exceptions.ConnectionError, redis.exceptions.TimeoutError, BrokenPipeError): print("Reconnecting to Redis...") return connect_to_redis() return redis_client def load_object_from_redis(key): redis_client = connect_to_redis() redis_client = reconnect_if_needed(redis_client) try: obj_data = redis_client.get(key) return pickle.loads(obj_data) if obj_data else None except (pickle.PickleError, redis.exceptions.RedisError) as e: print(f"Failed to load object from Redis: {e}") return None def save_object_to_redis(key, obj): redis_client = connect_to_redis() redis_client = reconnect_if_needed(redis_client) try: redis_client.set(key, pickle.dumps(obj)) print(f"Object saved to Redis: {key}") except redis.exceptions.RedisError as e: print(f"Failed to save object to Redis: {e}") def get_model_or_download(model_id, redis_key, loader_func): model = load_object_from_redis(redis_key) if model: print(f"Model loaded from Redis: {redis_key}") return model try: model = loader_func(model_id, torch_dtype=torch.float16) save_object_to_redis(redis_key, model) print(f"Model downloaded and saved to Redis: {redis_key}") except Exception as e: print(f"Failed to load or save model: {e}") return None def generate_image(prompt): redis_key = f"generated_image_{prompt}" image = load_object_from_redis(redis_key) if not image: try: image = text_to_image_pipeline(prompt).images[0] save_object_to_redis(redis_key, image) except Exception as e: print(f"Failed to generate image: {e}") return None return image def edit_image_with_prompt(image, prompt, strength=0.75): redis_key = f"edited_image_{prompt}_{strength}" edited_image = load_object_from_redis(redis_key) if not edited_image: try: edited_image = img2img_pipeline(prompt=prompt, init_image=image.convert("RGB"), strength=strength).images[0] save_object_to_redis(redis_key, edited_image) except Exception as e: print(f"Failed to edit image: {e}") return None return edited_image def generate_song(prompt, duration=10): redis_key = f"generated_song_{prompt}_{duration}" song = load_object_from_redis(redis_key) if not song: try: song = music_gen.generate(prompt, duration=duration) save_object_to_redis(redis_key, song) except Exception as e: print(f"Failed to generate song: {e}") return None return song def generate_text(prompt): redis_key = f"generated_text_{prompt}" text = load_object_from_redis(redis_key) if not text: try: text = text_gen_pipeline([{"role": "user", "content": prompt}], max_new_tokens=256)[0]["generated_text"].strip() save_object_to_redis(redis_key, text) except Exception as e: print(f"Failed to generate text: {e}") return None return text def generate_flux_image(prompt): redis_key = f"generated_flux_image_{prompt}" flux_image = load_object_from_redis(redis_key) if not flux_image: try: flux_image = flux_pipeline( prompt, guidance_scale=0.0, num_inference_steps=4, max_sequence_length=256, generator=torch.Generator("cpu").manual_seed(0) ).images[0] save_object_to_redis(redis_key, flux_image) except Exception as e: print(f"Failed to generate flux image: {e}") return None return flux_image def generate_code(prompt): redis_key = f"generated_code_{prompt}" code = load_object_from_redis(redis_key) if not code: try: inputs = starcoder_tokenizer.encode(prompt, return_tensors="pt").to("cuda") outputs = starcoder_model.generate(inputs) code = starcoder_tokenizer.decode(outputs[0]) save_object_to_redis(redis_key, code) except Exception as e: print(f"Failed to generate code: {e}") return None return code def generate_video(prompt): redis_key = f"generated_video_{prompt}" video = load_object_from_redis(redis_key) if not video: try: pipe = DiffusionPipeline.from_pretrained("damo-vilab/text-to-video-ms-1.7b", torch_dtype=torch.float16) pipe.scheduler = DPMSolverMultistepScheduler.from_config(pipe.scheduler.config) pipe.enable_model_cpu_offload() video = export_to_video(pipe(prompt, num_inference_steps=25).frames) save_object_to_redis(redis_key, video) except Exception as e: print(f"Failed to generate video: {e}") return None return video def test_model_meta_llama(): redis_key = "meta_llama_test_response" response = load_object_from_redis(redis_key) if not response: try: messages = [ {"role": "system", "content": "You are a pirate chatbot who always responds in pirate speak!"}, {"role": "user", "content": "Who are you?"} ] response = meta_llama_pipeline(messages, max_new_tokens=256)[0]["generated_text"].strip() save_object_to_redis(redis_key, response) except Exception as e: print(f"Failed to test Meta-Llama: {e}") return None return response def train_model(model, dataset, epochs, batch_size, learning_rate): output_dir = io.BytesIO() training_args = TrainingArguments( output_dir=output_dir, num_train_epochs=epochs, per_device_train_batch_size=batch_size, learning_rate=learning_rate, ) trainer = Trainer(model=model, args=training_args, train_dataset=dataset) try: trainer.train() save_object_to_redis("trained_model", model) save_object_to_redis("training_results", output_dir.getvalue()) except Exception as e: print(f"Failed to train model: {e}") def run_task(task_queue): while True: task = task_queue.get() if task is None: break func, args, kwargs = task try: func(*args, **kwargs) except Exception as e: print(f"Failed to run task: {e}") task_queue = multiprocessing.Queue() num_processes = multiprocessing.cpu_count() processes = [] for _ in range(num_processes): p = multiprocessing.Process(target=run_task, args=(task_queue,)) p.start() processes.append(p) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Cargar modelos text_to_image_pipeline = get_model_or_download("CompVis/stable-diffusion-v1-4", "text_to_image_model", StableDiffusionPipeline.from_pretrained) img2img_pipeline = get_model_or_download("CompVis/stable-diffusion-v1-4", "img2img_model", StableDiffusionImg2ImgPipeline.from_pretrained) flux_pipeline = get_model_or_download("black-forest-labs/FLUX.1-schnell", "flux_model", FluxPipeline.from_pretrained) text_gen_pipeline = transformers_pipeline("text-generation", model="bigcode/starcoder", tokenizer="bigcode/starcoder", device=0) music_gen = load_object_from_redis("music_gen") or MusicGen.from_pretrained('melody') meta_llama_pipeline = get_model_or_download("meta-llama/Meta-Llama-3.1-8B-Instruct", "meta_llama_model", transformers_pipeline) # Definir interfaces de usuario gen_image_tab = gr.Interface(generate_image, gr.inputs.Textbox(label="Prompt:"), gr.outputs.Image(type="pil"), title="Generate Image") edit_image_tab = gr.Interface(edit_image_with_prompt, [gr.inputs.Image(type="pil", label="Image:"), gr.inputs.Textbox(label="Prompt:"), gr.inputs.Slider(0.1, 1.0, 0.75, step=0.05, label="Strength:")], gr.outputs.Image(type="pil"), title="Edit Image") generate_song_tab = gr.Interface(generate_song, [gr.inputs.Textbox(label="Prompt:"), gr.inputs.Slider(5, 60, 10, step=1, label="Duration (s):")], gr.outputs.Audio(type="numpy"), title="Generate Songs") generate_text_tab = gr.Interface(generate_text, gr.inputs.Textbox(label="Prompt:"), gr.outputs.Textbox(label="Generated Text:"), title="Generate Text") generate_flux_image_tab = gr.Interface(generate_flux_image, gr.inputs.Textbox(label="Prompt:"), gr.outputs.Image(type="pil"), title="Generate FLUX Images") model_meta_llama_test_tab = gr.Interface(test_model_meta_llama, gr.inputs.Textbox(label="Test Input:"), gr.outputs.Textbox(label="Model Output:"), title="Test Meta-Llama") app = gr.TabbedInterface( [gen_image_tab, edit_image_tab, generate_song_tab, generate_text_tab, generate_flux_image_tab, model_meta_llama_test_tab], ["Generate Image", "Edit Image", "Generate Song", "Generate Text", "Generate FLUX Image", "Test Meta-Llama"] ) app.launch(share=True) for _ in range(num_processes): task_queue.put(None) for p in processes: p.join()