import gradio as gr from huggingface_hub import InferenceClient import os import json import base64 from PIL import Image import io # Import smolagents Tool from smolagents import Tool ACCESS_TOKEN = os.getenv("HF_TOKEN") print("Access token loaded.") # Initialize the image generation tool # This can be defined globally as it doesn't change per request try: image_generation_tool = Tool.from_space( "black-forest-labs/FLUX.1-schnell", name="image_generator", description="Generates an image from a text prompt. Use it when the user asks to 'generate an image of ...' or 'draw a picture of ...'. The input should be the descriptive prompt for the image." ) print("Image generation tool loaded successfully.") except Exception as e: print(f"Error loading image generation tool: {e}") image_generation_tool = None # Function to encode image to base64 def encode_image(image_path): if not image_path: print("No image path provided") return None try: print(f"Encoding image from path: {image_path}") # If it's already a PIL Image if isinstance(image_path, Image.Image): image = image_path else: # Try to open the image file image = Image.open(image_path) # Convert to RGB if image has an alpha channel (RGBA) if image.mode == 'RGBA': image = image.convert('RGB') # Encode to base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") print("Image encoded successfully") return img_str except Exception as e: print(f"Error encoding image: {e}") return None def respond( message_text, # Changed from 'message' to be explicit about text part image_files, # This will be a list of paths from gr.MultimodalTextbox history: list[list[Any, str | None]], # History can now contain complex user messages system_message, max_tokens, temperature, top_p, frequency_penalty, seed, provider, custom_api_key, custom_model, model_search_term, selected_model ): print(f"Received message text: {message_text}") print(f"Received {len(image_files) if image_files else 0} image files: {image_files}") # print(f"History: {history}") # Can be very verbose print(f"System message: {system_message}") print(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}") print(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}") print(f"Selected provider: {provider}") print(f"Custom API Key provided: {bool(custom_api_key.strip())}") print(f"Selected model (custom_model): {custom_model}") print(f"Model search term: {model_search_term}") print(f"Selected model from radio: {selected_model}") # Determine which token to use token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN if custom_api_key.strip() != "": print("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication") else: print("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication") user_text_message_lower = message_text.lower() if message_text else "" image_keywords = ["generate image", "draw a picture of", "create an image of", "make an image of"] is_image_generation_request = any(keyword in user_text_message_lower for keyword in image_keywords) if is_image_generation_request and image_generation_tool: print("Image generation request detected.") image_prompt = message_text for keyword in image_keywords: if keyword in user_text_message_lower: # Find the keyword in the original case-sensitive message text to split keyword_start_index = user_text_message_lower.find(keyword) image_prompt = message_text[keyword_start_index + len(keyword):].strip() break print(f"Extracted image prompt: {image_prompt}") if not image_prompt: yield {"type": "text", "content": "Please provide a description for the image you want to generate."} return try: generated_image_path = image_generation_tool(prompt=image_prompt) print(f"Image generated by tool, path: {generated_image_path}") yield {"type": "image", "path": str(generated_image_path)} # Ensure path is string return except Exception as e: print(f"Error during image generation tool call: {e}") yield {"type": "text", "content": f"Sorry, I couldn't generate the image. Error: {str(e)}"} return elif is_image_generation_request and not image_generation_tool: yield {"type": "text", "content": "Image generation tool is not available or failed to load."} return # If not an image generation request, proceed with text/multimodal LLM call print("Proceeding with LLM call (text or multimodal).") client = InferenceClient(token=token_to_use, provider=provider) print(f"Hugging Face Inference Client initialized with {provider} provider.") if seed == -1: seed = None # Prepare messages for LLM llm_user_content = [] if message_text and message_text.strip(): llm_user_content.append({"type": "text", "text": message_text}) if image_files: # image_files is a list of paths from gr.MultimodalTextbox for img_path in image_files: if img_path: try: encoded_image = encode_image(img_path) # img_path is already a path if encoded_image: llm_user_content.append({ "type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_image}"} }) except Exception as e: print(f"Error encoding image for LLM: {e}") if not llm_user_content: # Should not happen if user() function filters empty messages print("No content for LLM, aborting.") yield {"type": "text", "content": "Please provide some input."} return messages_for_llm = [{"role": "system", "content": system_message}] print("Initial messages array constructed for LLM.") for val in history: # history item is [user_content_list, assistant_response_str_or_dict] user_content_list_hist = val[0] assistant_response_hist = val[1] if user_content_list_hist: # user_content_list_hist is already in the correct format (list of dicts) messages_for_llm.append({"role": "user", "content": user_content_list_hist}) if assistant_response_hist: # Assistant response could be text or an image dict from a previous tool call if isinstance(assistant_response_hist, dict) and assistant_response_hist.get("type") == "image": messages_for_llm.append({"role": "assistant", "content": [{"type": "text", "text": f"Assistant previously displayed image: {assistant_response_hist.get('path')}"}]}) elif isinstance(assistant_response_hist, str): messages_for_llm.append({"role": "assistant", "content": assistant_response_hist}) # Else, if it's a dict but not an image type we understand for history, we might skip or log an error messages_for_llm.append({"role": "user", "content": llm_user_content}) # print(f"Full messages_for_llm: {messages_for_llm}") # Can be very verbose model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model print(f"Model selected for LLM inference: {model_to_use}") response_text = "" print(f"Sending request to {provider} provider for LLM.") parameters = { "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "frequency_penalty": frequency_penalty, } if seed is not None: parameters["seed"] = seed try: stream = client.chat_completion( model=model_to_use, messages=messages_for_llm, stream=True, **parameters ) print("Received LLM tokens: ", end="", flush=True) for chunk in stream: if hasattr(chunk, 'choices') and len(chunk.choices) > 0: if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): token_text = chunk.choices[0].delta.content if token_text: print(token_text, end="", flush=True) response_text += token_text yield {"type": "text", "content": response_text} print() except Exception as e: print(f"Error during LLM inference: {e}") response_text += f"\nError: {str(e)}" yield {"type": "text", "content": response_text} print("Completed LLM response generation.") def validate_provider(api_key, provider): if not api_key.strip() and provider != "hf-inference": return gr.update(value="hf-inference") return gr.update(value=provider) with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo: chatbot = gr.Chatbot( height=600, show_copy_button=True, placeholder="Select a model and begin chatting. Now supports multiple inference providers and multimodal inputs. Try 'generate image of a cat playing chess'.", layout="panel", bubble_full_width=False ) print("Chatbot interface created.") msg = gr.MultimodalTextbox( placeholder="Type a message or upload images...", show_label=False, container=False, scale=12, file_types=["image"], file_count="multiple", sources=["upload"] ) with gr.Accordion("Settings", open=False): system_message_box = gr.Textbox( value="You are a helpful AI assistant that can understand images and text. If asked to generate an image, respond by saying you will call the image_generator tool.", placeholder="You are a helpful assistant.", label="System Prompt" ) with gr.Row(): with gr.Column(): max_tokens_slider = gr.Slider(minimum=1, maximum=4096, value=512, step=1, label="Max tokens") temperature_slider = gr.Slider(minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature") top_p_slider = gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-P") with gr.Column(): frequency_penalty_slider = gr.Slider(minimum=-2.0, maximum=2.0, value=0.0, step=0.1, label="Frequency Penalty") seed_slider = gr.Slider(minimum=-1, maximum=65535, value=-1, step=1, label="Seed (-1 for random)") providers_list = ["hf-inference", "cerebras", "together", "sambanova", "novita", "cohere", "fireworks-ai", "hyperbolic", "nebius"] provider_radio = gr.Radio(choices=providers_list, value="hf-inference", label="Inference Provider") byok_textbox = gr.Textbox(value="", label="BYOK (Bring Your Own Key)", info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.", placeholder="Enter your Hugging Face API token", type="password") custom_model_box = gr.Textbox(value="", label="Custom Model", info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.", placeholder="meta-llama/Llama-3.3-70B-Instruct") model_search_box = gr.Textbox(label="Filter Models", placeholder="Search for a featured model...", lines=1) models_list = [ "meta-llama/Llama-3.2-11B-Vision-Instruct", "meta-llama/Llama-3.3-70B-Instruct", "meta-llama/Llama-3.1-70B-Instruct", "meta-llama/Llama-3.0-70B-Instruct", "meta-llama/Llama-3.2-3B-Instruct", "meta-llama/Llama-3.2-1B-Instruct", "meta-llama/Llama-3.1-8B-Instruct", "NousResearch/Hermes-3-Llama-3.1-8B", "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", "mistralai/Mistral-Nemo-Instruct-2407", "mistralai/Mixtral-8x7B-Instruct-v0.1", "mistralai/Mistral-7B-Instruct-v0.3", "mistralai/Mistral-7B-Instruct-v0.2", "Qwen/Qwen3-235B-A22B", "Qwen/Qwen3-32B", "Qwen/Qwen2.5-72B-Instruct", "Qwen/Qwen2.5-3B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", "Qwen/QwQ-32B", "Qwen/Qwen2.5-Coder-32B-Instruct", "microsoft/Phi-3.5-mini-instruct", "microsoft/Phi-3-mini-128k-instruct", "microsoft/Phi-3-mini-4k-instruct", ] featured_model_radio = gr.Radio(label="Select a model below", choices=models_list, value="meta-llama/Llama-3.2-11B-Vision-Instruct", interactive=True) gr.Markdown("[View all Text-to-Text models](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending) | [View all multimodal models](https://huggingface.co/models?inference_provider=all&pipeline_tag=image-text-to-text&sort=trending)") chat_history = gr.State([]) def filter_models(search_term): print(f"Filtering models with search term: {search_term}") filtered = [m for m in models_list if search_term.lower() in m.lower()] print(f"Filtered models: {filtered}") return gr.update(choices=filtered) def set_custom_model_from_radio(selected): print(f"Featured model selected: {selected}") return selected def user(user_multimodal_input, history): print(f"User input (raw from gr.MultimodalTextbox): {user_multimodal_input}") text_content = user_multimodal_input.get("text", "").strip() files = user_multimodal_input.get("files", []) # These are temp file paths from Gradio if not text_content and not files: print("Empty input, skipping history append.") # Optionally, could raise gr.Error("Please enter a message or upload an image.") # For now, let's allow the bot to respond if history is not empty, # or do nothing if history is also empty. return history # Prepare content for history: a list of dicts for multimodal display history_user_entry_content = [] if text_content: history_user_entry_content.append({"type": "text", "text": text_content}) for file_path_obj in files: # file_path_obj is a FileData object from Gradio if file_path_obj and hasattr(file_path_obj, 'name') and file_path_obj.name: # Gradio's Chatbot can display images directly from file paths # We store it in a format that `respond` can also understand # The path is temporary, Gradio handles making it accessible for display history_user_entry_content.append({"type": "image_url", "image_url": {"url": file_path_obj.name}}) print(f"Adding image to history entry: {file_path_obj.name}") if history_user_entry_content: history.append([history_user_entry_content, None]) # User part, Bot part (initially None) return history def bot(history, system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model): if not history or not history[-1][0]: # If no user message or empty user message content print("No user message to process in bot function or user message content is empty.") yield history # Return current history without processing return user_content_list = history[-1][0] # This is now a list of content dicts # Extract text and image file paths from the user_content_list for the `respond` function text_for_respond = "" image_files_for_respond = [] for item in user_content_list: if item["type"] == "text": text_for_respond = item["text"] elif item["type"] == "image_url": image_files_for_respond.append(item["image_url"]["url"]) history[-1][1] = "" # Clear placeholder for bot response / Initialize bot response # Call the respond function which is now a generator for response_chunk in respond( text_for_respond, image_files_for_respond, history[:-1], # Pass previous history system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model ): current_bot_response = history[-1][1] if isinstance(response_chunk, dict): if response_chunk["type"] == "text": # If current bot response is already an image dict, we can't append text. # This indicates a new text response after an image, or just text. if isinstance(current_bot_response, dict) and current_bot_response.get("type") == "image": # This case should ideally not happen if an image is the final response from a tool. # If it does, we might need to start a new bot message in history. # For now, we'll overwrite if the new chunk is text. history[-1][1] = response_chunk["content"] elif isinstance(current_bot_response, str): history[-1][1] = response_chunk["content"] # Accumulate text else: # current_bot_response is likely "" or None history[-1][1] = response_chunk["content"] elif response_chunk["type"] == "image": # Image response from tool. Gradio Chatbot displays this as an image. # The path should be accessible by Gradio. # If there was prior text content for this turn, it's now overwritten by the image. # This means a tool call that produces an image is considered the primary response for that turn. history[-1][1] = {"path": response_chunk["path"], "mime_type": "image/jpeg"} # Assuming JPEG, could be PNG yield history msg.submit( user, [msg, chatbot], [chatbot], queue=False ).then( bot, [chatbot, system_message_box, max_tokens_slider, temperature_slider, top_p_slider, frequency_penalty_slider, seed_slider, provider_radio, byok_textbox, custom_model_box, model_search_box, featured_model_radio], [chatbot] ).then( lambda: {"text": "", "files": []}, # Clear MultimodalTextbox None, [msg] ) model_search_box.change(fn=filter_models, inputs=model_search_box, outputs=featured_model_radio) print("Model search box change event linked.") featured_model_radio.change(fn=set_custom_model_from_radio, inputs=featured_model_radio, outputs=custom_model_box) print("Featured model radio button change event linked.") byok_textbox.change(fn=validate_provider, inputs=[byok_textbox, provider_radio], outputs=provider_radio) print("BYOK textbox change event linked.") provider_radio.change(fn=validate_provider, inputs=[byok_textbox, provider_radio], outputs=provider_radio) print("Provider radio button change event linked.") print("Gradio interface initialized.") if __name__ == "__main__": print("Launching the demo application.") demo.launch(show_api=True)