Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import InferenceClient | |
import os | |
import json | |
import base64 | |
from PIL import Image | |
import io | |
import requests | |
from typing import Dict, List, Optional, Any, Union | |
import time | |
import logging | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
ACCESS_TOKEN = os.getenv("HF_TOKEN") | |
logger.info("Access token loaded.") | |
# MCP Client Configuration | |
MCP_SERVERS = {} | |
try: | |
mcp_config = os.getenv("MCP_CONFIG") | |
if mcp_config: | |
MCP_SERVERS = json.loads(mcp_config) | |
logger.info(f"Loaded MCP configuration: {len(MCP_SERVERS)} servers defined") | |
except Exception as e: | |
logger.error(f"Error loading MCP configuration: {e}") | |
# Function to encode image to base64 | |
def encode_image(image_path): | |
if not image_path: | |
logger.warning("No image path provided") | |
return None | |
try: | |
logger.info(f"Encoding image from path: {image_path}") | |
# If it's already a PIL Image | |
if isinstance(image_path, Image.Image): | |
image = image_path | |
else: | |
# Try to open the image file | |
image = Image.open(image_path) | |
# Convert to RGB if image has an alpha channel (RGBA) | |
if image.mode == 'RGBA': | |
image = image.convert('RGB') | |
# Encode to base64 | |
buffered = io.BytesIO() | |
image.save(buffered, format="JPEG") | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
logger.info("Image encoded successfully") | |
return img_str | |
except Exception as e: | |
logger.error(f"Error encoding image: {e}") | |
return None | |
# MCP Client implementation | |
class MCPClient: | |
def __init__(self, server_url: str): | |
self.server_url = server_url | |
self.session_id = None | |
logger.info(f"Initialized MCP Client for server: {server_url}") | |
def connect(self) -> bool: | |
"""Establish connection with the MCP server""" | |
try: | |
response = requests.post( | |
f"{self.server_url}/connect", | |
json={"client": "Serverless-TextGen-Hub", "version": "1.0.0"} | |
) | |
if response.status_code == 200: | |
result = response.json() | |
self.session_id = result.get("session_id") | |
logger.info(f"Connected to MCP server with session ID: {self.session_id}") | |
return True | |
else: | |
logger.error(f"Failed to connect to MCP server: {response.status_code} - {response.text}") | |
return False | |
except Exception as e: | |
logger.error(f"Error connecting to MCP server: {e}") | |
return False | |
def list_tools(self) -> List[Dict]: | |
"""List available tools from the MCP server""" | |
if not self.session_id: | |
if not self.connect(): | |
return [] | |
try: | |
response = requests.get( | |
f"{self.server_url}/tools/list", | |
headers={"X-MCP-Session": self.session_id} | |
) | |
if response.status_code == 200: | |
result = response.json() | |
tools = result.get("tools", []) | |
logger.info(f"Retrieved {len(tools)} tools from MCP server") | |
return tools | |
else: | |
logger.error(f"Failed to list tools: {response.status_code} - {response.text}") | |
return [] | |
except Exception as e: | |
logger.error(f"Error listing tools: {e}") | |
return [] | |
def call_tool(self, tool_name: str, args: Dict) -> Dict: | |
"""Call a tool on the MCP server""" | |
if not self.session_id: | |
if not self.connect(): | |
return {"error": "Not connected to MCP server"} | |
try: | |
response = requests.post( | |
f"{self.server_url}/tools/call", | |
headers={"X-MCP-Session": self.session_id}, | |
json={"name": tool_name, "arguments": args} | |
) | |
if response.status_code == 200: | |
result = response.json() | |
logger.info(f"Successfully called tool {tool_name}") | |
return result | |
else: | |
error_msg = f"Failed to call tool {tool_name}: {response.status_code} - {response.text}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
except Exception as e: | |
error_msg = f"Error calling tool {tool_name}: {e}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
# Text-to-speech client function | |
def text_to_speech(text: str, server_name: str = None) -> Optional[str]: | |
""" | |
Convert text to speech using an MCP TTS server | |
Returns an audio URL that can be embedded in the chat | |
""" | |
if not server_name or server_name not in MCP_SERVERS: | |
logger.warning(f"TTS server {server_name} not configured") | |
return None | |
server_url = MCP_SERVERS[server_name].get("url") | |
if not server_url: | |
logger.warning(f"No URL found for TTS server {server_name}") | |
return None | |
client = MCPClient(server_url) | |
# List available tools to find the TTS tool | |
tools = client.list_tools() | |
tts_tool = next((t for t in tools if "text_to_audio" in t["name"] or "tts" in t["name"]), None) | |
if not tts_tool: | |
logger.warning(f"No TTS tool found on server {server_name}") | |
return None | |
# Call the TTS tool | |
result = client.call_tool(tts_tool["name"], {"text": text, "speed": 1.0}) | |
if "error" in result: | |
logger.error(f"TTS error: {result['error']}") | |
return None | |
# Process the result - usually a base64 encoded WAV | |
audio_data = result.get("audio") or result.get("content") or result.get("result") | |
if isinstance(audio_data, str) and audio_data.startswith("data:audio"): | |
# Already a data URL | |
return audio_data | |
elif isinstance(audio_data, str): | |
# Assume it's base64 encoded | |
return f"data:audio/wav;base64,{audio_data}" | |
else: | |
logger.error(f"Unexpected TTS result format: {type(audio_data)}") | |
return None | |
def respond( | |
message, | |
image_files, # Changed parameter name and structure | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
frequency_penalty, | |
seed, | |
provider, | |
custom_api_key, | |
custom_model, | |
model_search_term, | |
selected_model, | |
tts_enabled=False, | |
tts_server=None | |
): | |
logger.info(f"Received message: {message}") | |
logger.info(f"Received {len(image_files) if image_files else 0} images") | |
logger.info(f"History: {history}") | |
logger.info(f"System message: {system_message}") | |
logger.info(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}") | |
logger.info(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}") | |
logger.info(f"Selected provider: {provider}") | |
logger.info(f"Custom API Key provided: {bool(custom_api_key.strip())}") | |
logger.info(f"Selected model (custom_model): {custom_model}") | |
logger.info(f"Model search term: {model_search_term}") | |
logger.info(f"Selected model from radio: {selected_model}") | |
logger.info(f"TTS enabled: {tts_enabled}, TTS server: {tts_server}") | |
# Determine which token to use | |
token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN | |
if custom_api_key.strip() != "": | |
logger.info("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication") | |
else: | |
logger.info("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication") | |
# Initialize the Inference Client with the provider and appropriate token | |
client = InferenceClient(token=token_to_use, provider=provider) | |
logger.info(f"Hugging Face Inference Client initialized with {provider} provider.") | |
# Convert seed to None if -1 (meaning random) | |
if seed == -1: | |
seed = None | |
# Create multimodal content if images are present | |
if image_files and len(image_files) > 0: | |
# Process the user message to include images | |
user_content = [] | |
# Add text part if there is any | |
if message and message.strip(): | |
user_content.append({ | |
"type": "text", | |
"text": message | |
}) | |
# Add image parts | |
for img in image_files: | |
if img is not None: | |
# Get raw image data from path | |
try: | |
encoded_image = encode_image(img) | |
if encoded_image: | |
user_content.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{encoded_image}" | |
} | |
}) | |
except Exception as e: | |
logger.error(f"Error encoding image: {e}") | |
else: | |
# Text-only message | |
user_content = message | |
# Prepare messages in the format expected by the API | |
messages = [{"role": "system", "content": system_message}] | |
logger.info("Initial messages array constructed.") | |
# Add conversation history to the context | |
for val in history: | |
user_part = val[0] | |
assistant_part = val[1] | |
if user_part: | |
# Handle both text-only and multimodal messages in history | |
if isinstance(user_part, tuple) and len(user_part) == 2: | |
# This is a multimodal message with text and images | |
history_content = [] | |
if user_part[0]: # Text | |
history_content.append({ | |
"type": "text", | |
"text": user_part[0] | |
}) | |
for img in user_part[1]: # Images | |
if img: | |
try: | |
encoded_img = encode_image(img) | |
if encoded_img: | |
history_content.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{encoded_img}" | |
} | |
}) | |
except Exception as e: | |
logger.error(f"Error encoding history image: {e}") | |
messages.append({"role": "user", "content": history_content}) | |
else: | |
# Regular text message | |
messages.append({"role": "user", "content": user_part}) | |
logger.info(f"Added user message to context (type: {type(user_part)})") | |
if assistant_part: | |
messages.append({"role": "assistant", "content": assistant_part}) | |
logger.info(f"Added assistant message to context: {assistant_part}") | |
# Append the latest user message | |
messages.append({"role": "user", "content": user_content}) | |
logger.info(f"Latest user message appended (content type: {type(user_content)})") | |
# Determine which model to use, prioritizing custom_model if provided | |
model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model | |
logger.info(f"Model selected for inference: {model_to_use}") | |
# Start with an empty string to build the response as tokens stream in | |
response = "" | |
logger.info(f"Sending request to {provider} provider.") | |
# Prepare parameters for the chat completion request | |
parameters = { | |
"max_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"frequency_penalty": frequency_penalty, | |
} | |
if seed is not None: | |
parameters["seed"] = seed | |
# Use the InferenceClient for making the request | |
try: | |
# Create a generator for the streaming response | |
stream = client.chat_completion( | |
model=model_to_use, | |
messages=messages, | |
stream=True, | |
**parameters | |
) | |
logger.info("Received tokens: ") | |
# Process the streaming response | |
for chunk in stream: | |
if hasattr(chunk, 'choices') and len(chunk.choices) > 0: | |
# Extract the content from the response | |
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): | |
token_text = chunk.choices[0].delta.content | |
if token_text: | |
print(token_text, end="", flush=True) | |
response += token_text | |
yield response | |
# If TTS is enabled and we have a response, convert it to speech | |
if tts_enabled and tts_server and response: | |
logger.info(f"Converting response to speech using TTS server: {tts_server}") | |
try: | |
audio_url = text_to_speech(response, tts_server) | |
if audio_url: | |
# Add audio tag to the end of the response | |
response += f"\n\n<audio src='{audio_url}' controls></audio>" | |
yield response | |
else: | |
logger.warning("TTS conversion failed, continuing without audio") | |
except Exception as e: | |
logger.error(f"Error in TTS conversion: {e}") | |
# Continue without TTS if there's an error | |
print() | |
except Exception as e: | |
logger.error(f"Error during inference: {e}") | |
response += f"\nError: {str(e)}" | |
yield response | |
logger.info("Completed response generation.") | |
# Function to validate provider selection based on BYOK | |
def validate_provider(api_key, provider): | |
if not api_key.strip() and provider != "hf-inference": | |
return gr.update(value="hf-inference") | |
return gr.update(value=provider) | |
# Function to list available MCP servers | |
def list_mcp_servers(): | |
"""List all configured MCP servers""" | |
return list(MCP_SERVERS.keys()) | |
# GRADIO UI | |
with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo: | |
# Create the chatbot component | |
chatbot = gr.Chatbot( | |
height=600, | |
show_copy_button=True, | |
placeholder="Select a model and begin chatting. Now supports multiple inference providers, multimodal inputs, and MCP servers", | |
layout="panel" | |
) | |
logger.info("Chatbot interface created.") | |
# Multimodal textbox for messages (combines text and file uploads) | |
msg = gr.MultimodalTextbox( | |
placeholder="Type a message or upload images...", | |
show_label=False, | |
container=False, | |
scale=12, | |
file_types=["image"], | |
file_count="multiple", | |
sources=["upload"] | |
) | |
# Create accordion for settings | |
with gr.Accordion("Settings", open=False): | |
# System message | |
system_message_box = gr.Textbox( | |
value="You are a helpful AI assistant that can understand images and text.", | |
placeholder="You are a helpful assistant.", | |
label="System Prompt" | |
) | |
# Generation parameters | |
with gr.Row(): | |
with gr.Column(): | |
max_tokens_slider = gr.Slider( | |
minimum=1, | |
maximum=4096, | |
value=512, | |
step=1, | |
label="Max tokens" | |
) | |
temperature_slider = gr.Slider( | |
minimum=0.1, | |
maximum=4.0, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
top_p_slider = gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-P" | |
) | |
with gr.Column(): | |
frequency_penalty_slider = gr.Slider( | |
minimum=-2.0, | |
maximum=2.0, | |
value=0.0, | |
step=0.1, | |
label="Frequency Penalty" | |
) | |
seed_slider = gr.Slider( | |
minimum=-1, | |
maximum=65535, | |
value=-1, | |
step=1, | |
label="Seed (-1 for random)" | |
) | |
# Provider selection | |
providers_list = [ | |
"hf-inference", # Default Hugging Face Inference | |
"cerebras", # Cerebras provider | |
"together", # Together AI | |
"sambanova", # SambaNova | |
"novita", # Novita AI | |
"cohere", # Cohere | |
"fireworks-ai", # Fireworks AI | |
"hyperbolic", # Hyperbolic | |
"nebius", # Nebius | |
] | |
provider_radio = gr.Radio( | |
choices=providers_list, | |
value="hf-inference", | |
label="Inference Provider", | |
) | |
# New BYOK textbox | |
byok_textbox = gr.Textbox( | |
value="", | |
label="BYOK (Bring Your Own Key)", | |
info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.", | |
placeholder="Enter your Hugging Face API token", | |
type="password" # Hide the API key for security | |
) | |
# Custom model box | |
custom_model_box = gr.Textbox( | |
value="", | |
label="Custom Model", | |
info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.", | |
placeholder="meta-llama/Llama-3.3-70B-Instruct" | |
) | |
# Model search | |
model_search_box = gr.Textbox( | |
label="Filter Models", | |
placeholder="Search for a featured model...", | |
lines=1 | |
) | |
# Featured models list | |
# Updated to include multimodal models | |
models_list = [ | |
"meta-llama/Llama-3.2-11B-Vision-Instruct", | |
"meta-llama/Llama-3.3-70B-Instruct", | |
"meta-llama/Llama-3.1-70B-Instruct", | |
"meta-llama/Llama-3.0-70B-Instruct", | |
"meta-llama/Llama-3.2-3B-Instruct", | |
"meta-llama/Llama-3.2-1B-Instruct", | |
"meta-llama/Llama-3.1-8B-Instruct", | |
"NousResearch/Hermes-3-Llama-3.1-8B", | |
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
"mistralai/Mistral-Nemo-Instruct-2407", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", | |
"mistralai/Mistral-7B-Instruct-v0.3", | |
"mistralai/Mistral-7B-Instruct-v0.2", | |
"Qwen/Qwen3-235B-A22B", | |
"Qwen/Qwen3-32B", | |
"Qwen/Qwen2.5-72B-Instruct", | |
"Qwen/Qwen2.5-3B-Instruct", | |
"Qwen/Qwen2.5-0.5B-Instruct", | |
"Qwen/QwQ-32B", | |
"Qwen/Qwen2.5-Coder-32B-Instruct", | |
"microsoft/Phi-3.5-mini-instruct", | |
"microsoft/Phi-3-mini-128k-instruct", | |
"microsoft/Phi-3-mini-4k-instruct", | |
] | |
featured_model_radio = gr.Radio( | |
label="Select a model below", | |
choices=models_list, | |
value="meta-llama/Llama-3.2-11B-Vision-Instruct", # Default to a multimodal model | |
interactive=True | |
) | |
gr.Markdown("[View all Text-to-Text models](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending) | [View all multimodal models](https://huggingface.co/models?inference_provider=all&pipeline_tag=image-text-to-text&sort=trending)") | |
# MCP TTS integration | |
with gr.Accordion("MCP Integration", open=False): | |
gr.Markdown("## Model Context Protocol (MCP) Integration") | |
gr.Markdown("Connect to MCP servers to extend functionality.") | |
tts_enabled = gr.Checkbox( | |
label="Enable Text-to-Speech", | |
value=False, | |
info="When enabled, responses will be converted to speech using the selected MCP TTS server" | |
) | |
# Create dropdown for available MCP servers | |
available_servers = list_mcp_servers() | |
tts_server = gr.Dropdown( | |
label="TTS Server", | |
choices=available_servers, | |
value=available_servers[0] if available_servers else None, | |
interactive=True, | |
visible=len(available_servers) > 0 | |
) | |
# If no servers configured, show a message | |
if not available_servers: | |
gr.Markdown(""" | |
No MCP servers configured. Add them using the MCP_CONFIG environment variable: | |
```json | |
{ | |
"kokoroTTS": { | |
"url": "https://your-kokoro-tts-server/gradio_api/mcp/sse" | |
} | |
} | |
``` | |
""") | |
# Chat history state | |
chat_history = gr.State([]) | |
# Function to filter models | |
def filter_models(search_term): | |
logger.info(f"Filtering models with search term: {search_term}") | |
filtered = [m for m in models_list if search_term.lower() in m.lower()] | |
logger.info(f"Filtered models: {filtered}") | |
return gr.update(choices=filtered) | |
# Function to set custom model from radio | |
def set_custom_model_from_radio(selected): | |
logger.info(f"Featured model selected: {selected}") | |
return selected | |
# Function for the chat interface | |
def user(user_message, history): | |
# Debug logging for troubleshooting | |
logger.info(f"User message received: {user_message}") | |
# Skip if message is empty (no text and no files) | |
if not user_message or (not user_message.get("text") and not user_message.get("files")): | |
logger.info("Empty message, skipping") | |
return history | |
# Prepare multimodal message format | |
text_content = user_message.get("text", "").strip() | |
files = user_message.get("files", []) | |
logger.info(f"Text content: {text_content}") | |
logger.info(f"Files: {files}") | |
# If both text and files are empty, skip | |
if not text_content and not files: | |
logger.info("No content to display") | |
return history | |
# Add message with images to history | |
if files and len(files) > 0: | |
# Add text message first if it exists | |
if text_content: | |
# Add a separate text message | |
logger.info(f"Adding text message: {text_content}") | |
history.append([text_content, None]) | |
# Then add each image file separately | |
for file_path in files: | |
if file_path and isinstance(file_path, str): | |
logger.info(f"Adding image: {file_path}") | |
# Add image as a separate message with no text | |
history.append([f"", None]) | |
return history | |
else: | |
# For text-only messages | |
logger.info(f"Adding text-only message: {text_content}") | |
history.append([text_content, None]) | |
return history | |
# Define bot response function | |
def bot(history, system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model, tts_enabled, tts_server): | |
# Check if history is valid | |
if not history or len(history) == 0: | |
logger.info("No history to process") | |
return history | |
# Get the most recent message and detect if it's an image | |
user_message = history[-1][0] | |
logger.info(f"Processing user message: {user_message}") | |
is_image = False | |
image_path = None | |
text_content = user_message | |
# Check if this is an image message (marked with ![Image]) | |
if isinstance(user_message, str) and user_message.startswith(": | |
is_image = True | |
# Extract image path from markdown format  | |
image_path = user_message.replace(".replace(")", "") | |
logger.info(f"Image detected: {image_path}") | |
text_content = "" # No text for image-only messages | |
# Look back for text context if this is an image | |
text_context = "" | |
if is_image and len(history) > 1: | |
# Use the previous message as context if it's text | |
prev_message = history[-2][0] | |
if isinstance(prev_message, str) and not prev_message.startswith(": | |
text_context = prev_message | |
logger.info(f"Using text context from previous message: {text_context}") | |
# Process message through respond function | |
history[-1][1] = "" | |
# Use either the image or text for the API | |
if is_image: | |
# For image messages | |
for response in respond( | |
text_context, # Text context from previous message if any | |
[image_path], # Current image | |
history[:-1], # Previous history | |
system_msg, | |
max_tokens, | |
temperature, | |
top_p, | |
freq_penalty, | |
seed, | |
provider, | |
api_key, | |
custom_model, | |
search_term, | |
selected_model, | |
tts_enabled, | |
tts_server | |
): | |
history[-1][1] = response | |
yield history | |
else: | |
# For text-only messages | |
for response in respond( | |
text_content, # Text message | |
None, # No image | |
history[:-1], # Previous history | |
system_msg, | |
max_tokens, | |
temperature, | |
top_p, | |
freq_penalty, | |
seed, | |
provider, | |
api_key, | |
custom_model, | |
search_term, | |
selected_model, | |
tts_enabled, | |
tts_server | |
): | |
history[-1][1] = response | |
yield history | |
# Event handlers - only using the MultimodalTextbox's built-in submit functionality | |
msg.submit( | |
user, | |
[msg, chatbot], | |
[chatbot], | |
queue=False | |
).then( | |
bot, | |
[chatbot, system_message_box, max_tokens_slider, temperature_slider, top_p_slider, | |
frequency_penalty_slider, seed_slider, provider_radio, byok_textbox, custom_model_box, | |
model_search_box, featured_model_radio, tts_enabled, tts_server], | |
[chatbot] | |
).then( | |
lambda: {"text": "", "files": []}, # Clear inputs after submission | |
None, | |
[msg] | |
) | |
# Connect the model filter to update the radio choices | |
model_search_box.change( | |
fn=filter_models, | |
inputs=model_search_box, | |
outputs=featured_model_radio | |
) | |
logger.info("Model search box change event linked.") | |
# Connect the featured model radio to update the custom model box | |
featured_model_radio.change( | |
fn=set_custom_model_from_radio, | |
inputs=featured_model_radio, | |
outputs=custom_model_box | |
) | |
logger.info("Featured model radio button change event linked.") | |
# Connect the BYOK textbox to validate provider selection | |
byok_textbox.change( | |
fn=validate_provider, | |
inputs=[byok_textbox, provider_radio], | |
outputs=provider_radio | |
) | |
logger.info("BYOK textbox change event linked.") | |
# Also validate provider when the radio changes to ensure consistency | |
provider_radio.change( | |
fn=validate_provider, | |
inputs=[byok_textbox, provider_radio], | |
outputs=provider_radio | |
) | |
logger.info("Provider radio button change event linked.") | |
# Update TTS server dropdown visibility based on the TTS toggle | |
tts_enabled.change( | |
lambda enabled: gr.update(visible=enabled and len(list_mcp_servers()) > 0), | |
inputs=tts_enabled, | |
outputs=tts_server | |
) | |
logger.info("Gradio interface initialized.") | |
if __name__ == "__main__": | |
logger.info("Launching the demo application.") | |
demo.launch(show_api=True) |