import gradio as gr from huggingface_hub import InferenceClient import os import json import base64 from PIL import Image import io import requests from typing import Dict, List, Optional, Any, Union import time import logging # Setup logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) ACCESS_TOKEN = os.getenv("HF_TOKEN") logger.info("Access token loaded.") # MCP Client Configuration MCP_SERVERS = {} try: mcp_config = os.getenv("MCP_CONFIG") if mcp_config: MCP_SERVERS = json.loads(mcp_config) logger.info(f"Loaded MCP configuration: {len(MCP_SERVERS)} servers defined") except Exception as e: logger.error(f"Error loading MCP configuration: {e}") # Function to encode image to base64 def encode_image(image_path): if not image_path: logger.warning("No image path provided") return None try: logger.info(f"Encoding image from path: {image_path}") # If it's already a PIL Image if isinstance(image_path, Image.Image): image = image_path else: # Try to open the image file image = Image.open(image_path) # Convert to RGB if image has an alpha channel (RGBA) if image.mode == 'RGBA': image = image.convert('RGB') # Encode to base64 buffered = io.BytesIO() image.save(buffered, format="JPEG") img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") logger.info("Image encoded successfully") return img_str except Exception as e: logger.error(f"Error encoding image: {e}") return None # MCP Client implementation class MCPClient: def __init__(self, server_url: str): self.server_url = server_url self.session_id = None logger.info(f"Initialized MCP Client for server: {server_url}") def connect(self) -> bool: """Establish connection with the MCP server""" try: response = requests.post( f"{self.server_url}/connect", json={"client": "Serverless-TextGen-Hub", "version": "1.0.0"} ) if response.status_code == 200: result = response.json() self.session_id = result.get("session_id") logger.info(f"Connected to MCP server with session ID: {self.session_id}") return True else: logger.error(f"Failed to connect to MCP server: {response.status_code} - {response.text}") return False except Exception as e: logger.error(f"Error connecting to MCP server: {e}") return False def list_tools(self) -> List[Dict]: """List available tools from the MCP server""" if not self.session_id: if not self.connect(): return [] try: response = requests.get( f"{self.server_url}/tools/list", headers={"X-MCP-Session": self.session_id} ) if response.status_code == 200: result = response.json() tools = result.get("tools", []) logger.info(f"Retrieved {len(tools)} tools from MCP server") return tools else: logger.error(f"Failed to list tools: {response.status_code} - {response.text}") return [] except Exception as e: logger.error(f"Error listing tools: {e}") return [] def call_tool(self, tool_name: str, args: Dict) -> Dict: """Call a tool on the MCP server""" if not self.session_id: if not self.connect(): return {"error": "Not connected to MCP server"} try: response = requests.post( f"{self.server_url}/tools/call", headers={"X-MCP-Session": self.session_id}, json={"name": tool_name, "arguments": args} ) if response.status_code == 200: result = response.json() logger.info(f"Successfully called tool {tool_name}") return result else: error_msg = f"Failed to call tool {tool_name}: {response.status_code} - {response.text}" logger.error(error_msg) return {"error": error_msg} except Exception as e: error_msg = f"Error calling tool {tool_name}: {e}" logger.error(error_msg) return {"error": error_msg} # Text-to-speech client function def text_to_speech(text: str, server_name: str = None) -> Optional[str]: """ Convert text to speech using an MCP TTS server Returns an audio URL that can be embedded in the chat """ if not server_name or server_name not in MCP_SERVERS: logger.warning(f"TTS server {server_name} not configured") return None server_url = MCP_SERVERS[server_name].get("url") if not server_url: logger.warning(f"No URL found for TTS server {server_name}") return None client = MCPClient(server_url) # List available tools to find the TTS tool tools = client.list_tools() tts_tool = next((t for t in tools if "text_to_audio" in t["name"] or "tts" in t["name"]), None) if not tts_tool: logger.warning(f"No TTS tool found on server {server_name}") return None # Call the TTS tool result = client.call_tool(tts_tool["name"], {"text": text, "speed": 1.0}) if "error" in result: logger.error(f"TTS error: {result['error']}") return None # Process the result - usually a base64 encoded WAV audio_data = result.get("audio") or result.get("content") or result.get("result") if isinstance(audio_data, str) and audio_data.startswith("data:audio"): # Already a data URL return audio_data elif isinstance(audio_data, str): # Assume it's base64 encoded return f"data:audio/wav;base64,{audio_data}" else: logger.error(f"Unexpected TTS result format: {type(audio_data)}") return None def respond( message, image_files, # Changed parameter name and structure history: list[tuple[str, str]], system_message, max_tokens, temperature, top_p, frequency_penalty, seed, provider, custom_api_key, custom_model, model_search_term, selected_model, tts_enabled=False, tts_server=None ): logger.info(f"Received message: {message}") logger.info(f"Received {len(image_files) if image_files else 0} images") logger.info(f"History: {history}") logger.info(f"System message: {system_message}") logger.info(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}") logger.info(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}") logger.info(f"Selected provider: {provider}") logger.info(f"Custom API Key provided: {bool(custom_api_key.strip())}") logger.info(f"Selected model (custom_model): {custom_model}") logger.info(f"Model search term: {model_search_term}") logger.info(f"Selected model from radio: {selected_model}") logger.info(f"TTS enabled: {tts_enabled}, TTS server: {tts_server}") # Determine which token to use token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN if custom_api_key.strip() != "": logger.info("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication") else: logger.info("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication") # Initialize the Inference Client with the provider and appropriate token client = InferenceClient(token=token_to_use, provider=provider) logger.info(f"Hugging Face Inference Client initialized with {provider} provider.") # Convert seed to None if -1 (meaning random) if seed == -1: seed = None # Create multimodal content if images are present if image_files and len(image_files) > 0: # Process the user message to include images user_content = [] # Add text part if there is any if message and message.strip(): user_content.append({ "type": "text", "text": message }) # Add image parts for img in image_files: if img is not None: # Get raw image data from path try: encoded_image = encode_image(img) if encoded_image: user_content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encoded_image}" } }) except Exception as e: logger.error(f"Error encoding image: {e}") else: # Text-only message user_content = message # Prepare messages in the format expected by the API messages = [{"role": "system", "content": system_message}] logger.info("Initial messages array constructed.") # Add conversation history to the context for val in history: user_part = val[0] assistant_part = val[1] if user_part: # Handle both text-only and multimodal messages in history if isinstance(user_part, tuple) and len(user_part) == 2: # This is a multimodal message with text and images history_content = [] if user_part[0]: # Text history_content.append({ "type": "text", "text": user_part[0] }) for img in user_part[1]: # Images if img: try: encoded_img = encode_image(img) if encoded_img: history_content.append({ "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encoded_img}" } }) except Exception as e: logger.error(f"Error encoding history image: {e}") messages.append({"role": "user", "content": history_content}) else: # Regular text message messages.append({"role": "user", "content": user_part}) logger.info(f"Added user message to context (type: {type(user_part)})") if assistant_part: messages.append({"role": "assistant", "content": assistant_part}) logger.info(f"Added assistant message to context: {assistant_part}") # Append the latest user message messages.append({"role": "user", "content": user_content}) logger.info(f"Latest user message appended (content type: {type(user_content)})") # Determine which model to use, prioritizing custom_model if provided model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model logger.info(f"Model selected for inference: {model_to_use}") # Start with an empty string to build the response as tokens stream in response = "" logger.info(f"Sending request to {provider} provider.") # Prepare parameters for the chat completion request parameters = { "max_tokens": max_tokens, "temperature": temperature, "top_p": top_p, "frequency_penalty": frequency_penalty, } if seed is not None: parameters["seed"] = seed # Use the InferenceClient for making the request try: # Create a generator for the streaming response stream = client.chat_completion( model=model_to_use, messages=messages, stream=True, **parameters ) logger.info("Received tokens: ") # Process the streaming response for chunk in stream: if hasattr(chunk, 'choices') and len(chunk.choices) > 0: # Extract the content from the response if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): token_text = chunk.choices[0].delta.content if token_text: print(token_text, end="", flush=True) response += token_text yield response # If TTS is enabled and we have a response, convert it to speech if tts_enabled and tts_server and response: logger.info(f"Converting response to speech using TTS server: {tts_server}") try: audio_url = text_to_speech(response, tts_server) if audio_url: # Add audio tag to the end of the response response += f"\n\n" yield response else: logger.warning("TTS conversion failed, continuing without audio") except Exception as e: logger.error(f"Error in TTS conversion: {e}") # Continue without TTS if there's an error print() except Exception as e: logger.error(f"Error during inference: {e}") response += f"\nError: {str(e)}" yield response logger.info("Completed response generation.") # Function to validate provider selection based on BYOK def validate_provider(api_key, provider): if not api_key.strip() and provider != "hf-inference": return gr.update(value="hf-inference") return gr.update(value=provider) # Function to list available MCP servers def list_mcp_servers(): """List all configured MCP servers""" return list(MCP_SERVERS.keys()) # GRADIO UI with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo: # Create the chatbot component chatbot = gr.Chatbot( height=600, show_copy_button=True, placeholder="Select a model and begin chatting. Now supports multiple inference providers, multimodal inputs, and MCP servers", layout="panel" ) logger.info("Chatbot interface created.") # Multimodal textbox for messages (combines text and file uploads) msg = gr.MultimodalTextbox( placeholder="Type a message or upload images...", show_label=False, container=False, scale=12, file_types=["image"], file_count="multiple", sources=["upload"] ) # Create accordion for settings with gr.Accordion("Settings", open=False): # System message system_message_box = gr.Textbox( value="You are a helpful AI assistant that can understand images and text.", placeholder="You are a helpful assistant.", label="System Prompt" ) # Generation parameters with gr.Row(): with gr.Column(): max_tokens_slider = gr.Slider( minimum=1, maximum=4096, value=512, step=1, label="Max tokens" ) temperature_slider = gr.Slider( minimum=0.1, maximum=4.0, value=0.7, step=0.1, label="Temperature" ) top_p_slider = gr.Slider( minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-P" ) with gr.Column(): frequency_penalty_slider = gr.Slider( minimum=-2.0, maximum=2.0, value=0.0, step=0.1, label="Frequency Penalty" ) seed_slider = gr.Slider( minimum=-1, maximum=65535, value=-1, step=1, label="Seed (-1 for random)" ) # Provider selection providers_list = [ "hf-inference", # Default Hugging Face Inference "cerebras", # Cerebras provider "together", # Together AI "sambanova", # SambaNova "novita", # Novita AI "cohere", # Cohere "fireworks-ai", # Fireworks AI "hyperbolic", # Hyperbolic "nebius", # Nebius ] provider_radio = gr.Radio( choices=providers_list, value="hf-inference", label="Inference Provider", ) # New BYOK textbox byok_textbox = gr.Textbox( value="", label="BYOK (Bring Your Own Key)", info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.", placeholder="Enter your Hugging Face API token", type="password" # Hide the API key for security ) # Custom model box custom_model_box = gr.Textbox( value="", label="Custom Model", info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.", placeholder="meta-llama/Llama-3.3-70B-Instruct" ) # Model search model_search_box = gr.Textbox( label="Filter Models", placeholder="Search for a featured model...", lines=1 ) # Featured models list # Updated to include multimodal models models_list = [ "meta-llama/Llama-3.2-11B-Vision-Instruct", "meta-llama/Llama-3.3-70B-Instruct", "meta-llama/Llama-3.1-70B-Instruct", "meta-llama/Llama-3.0-70B-Instruct", "meta-llama/Llama-3.2-3B-Instruct", "meta-llama/Llama-3.2-1B-Instruct", "meta-llama/Llama-3.1-8B-Instruct", "NousResearch/Hermes-3-Llama-3.1-8B", "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", "mistralai/Mistral-Nemo-Instruct-2407", "mistralai/Mixtral-8x7B-Instruct-v0.1", "mistralai/Mistral-7B-Instruct-v0.3", "mistralai/Mistral-7B-Instruct-v0.2", "Qwen/Qwen3-235B-A22B", "Qwen/Qwen3-32B", "Qwen/Qwen2.5-72B-Instruct", "Qwen/Qwen2.5-3B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", "Qwen/QwQ-32B", "Qwen/Qwen2.5-Coder-32B-Instruct", "microsoft/Phi-3.5-mini-instruct", "microsoft/Phi-3-mini-128k-instruct", "microsoft/Phi-3-mini-4k-instruct", ] featured_model_radio = gr.Radio( label="Select a model below", choices=models_list, value="meta-llama/Llama-3.2-11B-Vision-Instruct", # Default to a multimodal model interactive=True ) gr.Markdown("[View all Text-to-Text models](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending) | [View all multimodal models](https://huggingface.co/models?inference_provider=all&pipeline_tag=image-text-to-text&sort=trending)") # MCP TTS integration with gr.Accordion("MCP Integration", open=False): gr.Markdown("## Model Context Protocol (MCP) Integration") gr.Markdown("Connect to MCP servers to extend functionality.") tts_enabled = gr.Checkbox( label="Enable Text-to-Speech", value=False, info="When enabled, responses will be converted to speech using the selected MCP TTS server" ) # Create dropdown for available MCP servers available_servers = list_mcp_servers() tts_server = gr.Dropdown( label="TTS Server", choices=available_servers, value=available_servers[0] if available_servers else None, interactive=True, visible=len(available_servers) > 0 ) # If no servers configured, show a message if not available_servers: gr.Markdown(""" No MCP servers configured. Add them using the MCP_CONFIG environment variable: ```json { "kokoroTTS": { "url": "https://your-kokoro-tts-server/gradio_api/mcp/sse" } } ``` """) # Chat history state chat_history = gr.State([]) # Function to filter models def filter_models(search_term): logger.info(f"Filtering models with search term: {search_term}") filtered = [m for m in models_list if search_term.lower() in m.lower()] logger.info(f"Filtered models: {filtered}") return gr.update(choices=filtered) # Function to set custom model from radio def set_custom_model_from_radio(selected): logger.info(f"Featured model selected: {selected}") return selected # Function for the chat interface def user(user_message, history): # Debug logging for troubleshooting logger.info(f"User message received: {user_message}") # Skip if message is empty (no text and no files) if not user_message or (not user_message.get("text") and not user_message.get("files")): logger.info("Empty message, skipping") return history # Prepare multimodal message format text_content = user_message.get("text", "").strip() files = user_message.get("files", []) logger.info(f"Text content: {text_content}") logger.info(f"Files: {files}") # If both text and files are empty, skip if not text_content and not files: logger.info("No content to display") return history # Add message with images to history if files and len(files) > 0: # Add text message first if it exists if text_content: # Add a separate text message logger.info(f"Adding text message: {text_content}") history.append([text_content, None]) # Then add each image file separately for file_path in files: if file_path and isinstance(file_path, str): logger.info(f"Adding image: {file_path}") # Add image as a separate message with no text history.append([f"![Image]({file_path})", None]) return history else: # For text-only messages logger.info(f"Adding text-only message: {text_content}") history.append([text_content, None]) return history # Define bot response function def bot(history, system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model, tts_enabled, tts_server): # Check if history is valid if not history or len(history) == 0: logger.info("No history to process") return history # Get the most recent message and detect if it's an image user_message = history[-1][0] logger.info(f"Processing user message: {user_message}") is_image = False image_path = None text_content = user_message # Check if this is an image message (marked with ![Image]) if isinstance(user_message, str) and user_message.startswith("![Image]("): is_image = True # Extract image path from markdown format ![Image](path) image_path = user_message.replace("![Image](", "").replace(")", "") logger.info(f"Image detected: {image_path}") text_content = "" # No text for image-only messages # Look back for text context if this is an image text_context = "" if is_image and len(history) > 1: # Use the previous message as context if it's text prev_message = history[-2][0] if isinstance(prev_message, str) and not prev_message.startswith("![Image]("): text_context = prev_message logger.info(f"Using text context from previous message: {text_context}") # Process message through respond function history[-1][1] = "" # Use either the image or text for the API if is_image: # For image messages for response in respond( text_context, # Text context from previous message if any [image_path], # Current image history[:-1], # Previous history system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model, tts_enabled, tts_server ): history[-1][1] = response yield history else: # For text-only messages for response in respond( text_content, # Text message None, # No image history[:-1], # Previous history system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model, tts_enabled, tts_server ): history[-1][1] = response yield history # Event handlers - only using the MultimodalTextbox's built-in submit functionality msg.submit( user, [msg, chatbot], [chatbot], queue=False ).then( bot, [chatbot, system_message_box, max_tokens_slider, temperature_slider, top_p_slider, frequency_penalty_slider, seed_slider, provider_radio, byok_textbox, custom_model_box, model_search_box, featured_model_radio, tts_enabled, tts_server], [chatbot] ).then( lambda: {"text": "", "files": []}, # Clear inputs after submission None, [msg] ) # Connect the model filter to update the radio choices model_search_box.change( fn=filter_models, inputs=model_search_box, outputs=featured_model_radio ) logger.info("Model search box change event linked.") # Connect the featured model radio to update the custom model box featured_model_radio.change( fn=set_custom_model_from_radio, inputs=featured_model_radio, outputs=custom_model_box ) logger.info("Featured model radio button change event linked.") # Connect the BYOK textbox to validate provider selection byok_textbox.change( fn=validate_provider, inputs=[byok_textbox, provider_radio], outputs=provider_radio ) logger.info("BYOK textbox change event linked.") # Also validate provider when the radio changes to ensure consistency provider_radio.change( fn=validate_provider, inputs=[byok_textbox, provider_radio], outputs=provider_radio ) logger.info("Provider radio button change event linked.") # Update TTS server dropdown visibility based on the TTS toggle tts_enabled.change( lambda enabled: gr.update(visible=enabled and len(list_mcp_servers()) > 0), inputs=tts_enabled, outputs=tts_server ) logger.info("Gradio interface initialized.") if __name__ == "__main__": logger.info("Launching the demo application.") demo.launch(show_api=True)