import os import asyncio from generate_prompts import generate_prompt from diffusers import AutoPipelineForText2Image from io import BytesIO import gradio as gr from concurrent.futures import ThreadPoolExecutor # Load the model once outside of the function print("Loading the model...") model = AutoPipelineForText2Image.from_pretrained("stabilityai/sdxl-turbo") print("Model loaded successfully.") def generate_image(prompt, prompt_name): try: print(f"Generating response for {prompt_name} with prompt: {prompt}") output = model(prompt=prompt, num_inference_steps=1, guidance_scale=0.0) print(f"Output for {prompt_name}: {output}") # Check if the model returned images if isinstance(output.images, list) and len(output.images) > 0: image = output.images[0] buffered = BytesIO() try: image.save(buffered, format="JPEG") image_bytes = buffered.getvalue() print(f"Image bytes length for {prompt_name}: {len(image_bytes)}") return image_bytes except Exception as e: print(f"Error saving image for {prompt_name}: {e}") return None else: raise Exception(f"No images returned by the model for {prompt_name}.") except Exception as e: print(f"Error generating image for {prompt_name}: {e}") return None async def queue_api_calls(sentence_mapping, character_dict, selected_style): print(f"queue_api_calls invoked with sentence_mapping: {sentence_mapping}, character_dict: {character_dict}, selected_style: {selected_style}") prompts = [] # Generate prompts for each paragraph for paragraph_number, sentences in sentence_mapping.items(): combined_sentence = " ".join(sentences) print(f"combined_sentence for paragraph {paragraph_number}: {combined_sentence}") prompt = generate_prompt(combined_sentence, sentence_mapping, character_dict, selected_style) prompts.append((paragraph_number, prompt)) print(f"Generated prompt for paragraph {paragraph_number}: {prompt}") # Set max_workers to the total number of prompts max_workers = len(prompts) # Generate images for each prompt in parallel using threading with ThreadPoolExecutor(max_workers=max_workers) as executor: loop = asyncio.get_running_loop() tasks = [loop.run_in_executor(executor, generate_image, prompt, f"Prompt {paragraph_number}") for paragraph_number, prompt in prompts] print("Tasks created for image generation.") responses = await asyncio.gather(*tasks) print("Responses received from image generation tasks.") images = {paragraph_number: response for (paragraph_number, _), response in zip(prompts, responses)} print(f"Images generated: {images}") return images def process_prompt(sentence_mapping, character_dict, selected_style): print(f"process_prompt called with sentence_mapping: {sentence_mapping}, character_dict: {character_dict}, selected_style: {selected_style}") try: # See if there is a loop already running. If there is, reuse it. loop = asyncio.get_running_loop() except RuntimeError: # Create new event loop if one is not running loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) print("Event loop created.") # This sends the prompts to function that sets up the async calls. Once all the calls to the API complete, it returns a list of the gr.Textbox with value= set. cmpt_return = loop.run_until_complete(queue_api_calls(sentence_mapping, character_dict, selected_style)) print(f"process_prompt completed with return value: {cmpt_return}") return cmpt_return # Gradio interface with high concurrency limit gradio_interface = gr.Interface( fn=process_prompt, inputs=[ gr.JSON(label="Sentence Mapping"), gr.JSON(label="Character Dict"), gr.Dropdown(["oil painting", "sketch", "watercolor"], label="Selected Style") ], outputs="json" ) if __name__ == "__main__": print("Launching Gradio interface...") gradio_interface.launch() print("Gradio interface launched.")