Nymbo's picture
Update app.py
11de92c verified
raw
history blame
29 kB
import gradio as gr
from huggingface_hub import InferenceClient
import os
import json
import base64
from PIL import Image
import io
import requests
from typing import Dict, List, Optional, Any, Union
import time
import logging
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
ACCESS_TOKEN = os.getenv("HF_TOKEN")
logger.info("Access token loaded.")
# MCP Client Configuration
MCP_SERVERS = {}
try:
mcp_config = os.getenv("MCP_CONFIG")
if mcp_config:
MCP_SERVERS = json.loads(mcp_config)
logger.info(f"Loaded MCP configuration: {len(MCP_SERVERS)} servers defined")
except Exception as e:
logger.error(f"Error loading MCP configuration: {e}")
# Function to encode image to base64
def encode_image(image_path):
if not image_path:
logger.warning("No image path provided")
return None
try:
logger.info(f"Encoding image from path: {image_path}")
# If it's already a PIL Image
if isinstance(image_path, Image.Image):
image = image_path
else:
# Try to open the image file
image = Image.open(image_path)
# Convert to RGB if image has an alpha channel (RGBA)
if image.mode == 'RGBA':
image = image.convert('RGB')
# Encode to base64
buffered = io.BytesIO()
image.save(buffered, format="JPEG")
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8")
logger.info("Image encoded successfully")
return img_str
except Exception as e:
logger.error(f"Error encoding image: {e}")
return None
# MCP Client implementation
class MCPClient:
def __init__(self, server_url: str):
self.server_url = server_url
self.session_id = None
logger.info(f"Initialized MCP Client for server: {server_url}")
def connect(self) -> bool:
"""Establish connection with the MCP server"""
try:
response = requests.post(
f"{self.server_url}/connect",
json={"client": "Serverless-TextGen-Hub", "version": "1.0.0"}
)
if response.status_code == 200:
result = response.json()
self.session_id = result.get("session_id")
logger.info(f"Connected to MCP server with session ID: {self.session_id}")
return True
else:
logger.error(f"Failed to connect to MCP server: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"Error connecting to MCP server: {e}")
return False
def list_tools(self) -> List[Dict]:
"""List available tools from the MCP server"""
if not self.session_id:
if not self.connect():
return []
try:
response = requests.get(
f"{self.server_url}/tools/list",
headers={"X-MCP-Session": self.session_id}
)
if response.status_code == 200:
result = response.json()
tools = result.get("tools", [])
logger.info(f"Retrieved {len(tools)} tools from MCP server")
return tools
else:
logger.error(f"Failed to list tools: {response.status_code} - {response.text}")
return []
except Exception as e:
logger.error(f"Error listing tools: {e}")
return []
def call_tool(self, tool_name: str, args: Dict) -> Dict:
"""Call a tool on the MCP server"""
if not self.session_id:
if not self.connect():
return {"error": "Not connected to MCP server"}
try:
response = requests.post(
f"{self.server_url}/tools/call",
headers={"X-MCP-Session": self.session_id},
json={"name": tool_name, "arguments": args}
)
if response.status_code == 200:
result = response.json()
logger.info(f"Successfully called tool {tool_name}")
return result
else:
error_msg = f"Failed to call tool {tool_name}: {response.status_code} - {response.text}"
logger.error(error_msg)
return {"error": error_msg}
except Exception as e:
error_msg = f"Error calling tool {tool_name}: {e}"
logger.error(error_msg)
return {"error": error_msg}
# Text-to-speech client function
def text_to_speech(text: str, server_name: str = None) -> Optional[str]:
"""
Convert text to speech using an MCP TTS server
Returns an audio URL that can be embedded in the chat
"""
if not server_name or server_name not in MCP_SERVERS:
logger.warning(f"TTS server {server_name} not configured")
return None
server_url = MCP_SERVERS[server_name].get("url")
if not server_url:
logger.warning(f"No URL found for TTS server {server_name}")
return None
client = MCPClient(server_url)
# List available tools to find the TTS tool
tools = client.list_tools()
tts_tool = next((t for t in tools if "text_to_audio" in t["name"] or "tts" in t["name"]), None)
if not tts_tool:
logger.warning(f"No TTS tool found on server {server_name}")
return None
# Call the TTS tool
result = client.call_tool(tts_tool["name"], {"text": text, "speed": 1.0})
if "error" in result:
logger.error(f"TTS error: {result['error']}")
return None
# Process the result - usually a base64 encoded WAV
audio_data = result.get("audio") or result.get("content") or result.get("result")
if isinstance(audio_data, str) and audio_data.startswith("data:audio"):
# Already a data URL
return audio_data
elif isinstance(audio_data, str):
# Assume it's base64 encoded
return f"data:audio/wav;base64,{audio_data}"
else:
logger.error(f"Unexpected TTS result format: {type(audio_data)}")
return None
def respond(
message,
image_files, # Changed parameter name and structure
history: list[tuple[str, str]],
system_message,
max_tokens,
temperature,
top_p,
frequency_penalty,
seed,
provider,
custom_api_key,
custom_model,
model_search_term,
selected_model,
tts_enabled=False,
tts_server=None
):
logger.info(f"Received message: {message}")
logger.info(f"Received {len(image_files) if image_files else 0} images")
logger.info(f"History: {history}")
logger.info(f"System message: {system_message}")
logger.info(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}")
logger.info(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}")
logger.info(f"Selected provider: {provider}")
logger.info(f"Custom API Key provided: {bool(custom_api_key.strip())}")
logger.info(f"Selected model (custom_model): {custom_model}")
logger.info(f"Model search term: {model_search_term}")
logger.info(f"Selected model from radio: {selected_model}")
logger.info(f"TTS enabled: {tts_enabled}, TTS server: {tts_server}")
# Determine which token to use
token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN
if custom_api_key.strip() != "":
logger.info("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication")
else:
logger.info("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication")
# Initialize the Inference Client with the provider and appropriate token
client = InferenceClient(token=token_to_use, provider=provider)
logger.info(f"Hugging Face Inference Client initialized with {provider} provider.")
# Convert seed to None if -1 (meaning random)
if seed == -1:
seed = None
# Create multimodal content if images are present
if image_files and len(image_files) > 0:
# Process the user message to include images
user_content = []
# Add text part if there is any
if message and message.strip():
user_content.append({
"type": "text",
"text": message
})
# Add image parts
for img in image_files:
if img is not None:
# Get raw image data from path
try:
encoded_image = encode_image(img)
if encoded_image:
user_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}"
}
})
except Exception as e:
logger.error(f"Error encoding image: {e}")
else:
# Text-only message
user_content = message
# Prepare messages in the format expected by the API
messages = [{"role": "system", "content": system_message}]
logger.info("Initial messages array constructed.")
# Add conversation history to the context
for val in history:
user_part = val[0]
assistant_part = val[1]
if user_part:
# Handle both text-only and multimodal messages in history
if isinstance(user_part, tuple) and len(user_part) == 2:
# This is a multimodal message with text and images
history_content = []
if user_part[0]: # Text
history_content.append({
"type": "text",
"text": user_part[0]
})
for img in user_part[1]: # Images
if img:
try:
encoded_img = encode_image(img)
if encoded_img:
history_content.append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_img}"
}
})
except Exception as e:
logger.error(f"Error encoding history image: {e}")
messages.append({"role": "user", "content": history_content})
else:
# Regular text message
messages.append({"role": "user", "content": user_part})
logger.info(f"Added user message to context (type: {type(user_part)})")
if assistant_part:
messages.append({"role": "assistant", "content": assistant_part})
logger.info(f"Added assistant message to context: {assistant_part}")
# Append the latest user message
messages.append({"role": "user", "content": user_content})
logger.info(f"Latest user message appended (content type: {type(user_content)})")
# Determine which model to use, prioritizing custom_model if provided
model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model
logger.info(f"Model selected for inference: {model_to_use}")
# Start with an empty string to build the response as tokens stream in
response = ""
logger.info(f"Sending request to {provider} provider.")
# Prepare parameters for the chat completion request
parameters = {
"max_tokens": max_tokens,
"temperature": temperature,
"top_p": top_p,
"frequency_penalty": frequency_penalty,
}
if seed is not None:
parameters["seed"] = seed
# Use the InferenceClient for making the request
try:
# Create a generator for the streaming response
stream = client.chat_completion(
model=model_to_use,
messages=messages,
stream=True,
**parameters
)
logger.info("Received tokens: ")
# Process the streaming response
for chunk in stream:
if hasattr(chunk, 'choices') and len(chunk.choices) > 0:
# Extract the content from the response
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'):
token_text = chunk.choices[0].delta.content
if token_text:
print(token_text, end="", flush=True)
response += token_text
yield response
# If TTS is enabled and we have a response, convert it to speech
if tts_enabled and tts_server and response:
logger.info(f"Converting response to speech using TTS server: {tts_server}")
try:
audio_url = text_to_speech(response, tts_server)
if audio_url:
# Add audio tag to the end of the response
response += f"\n\n<audio src='{audio_url}' controls></audio>"
yield response
else:
logger.warning("TTS conversion failed, continuing without audio")
except Exception as e:
logger.error(f"Error in TTS conversion: {e}")
# Continue without TTS if there's an error
print()
except Exception as e:
logger.error(f"Error during inference: {e}")
response += f"\nError: {str(e)}"
yield response
logger.info("Completed response generation.")
# Function to validate provider selection based on BYOK
def validate_provider(api_key, provider):
if not api_key.strip() and provider != "hf-inference":
return gr.update(value="hf-inference")
return gr.update(value=provider)
# Function to list available MCP servers
def list_mcp_servers():
"""List all configured MCP servers"""
return list(MCP_SERVERS.keys())
# GRADIO UI
with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo:
# Create the chatbot component
chatbot = gr.Chatbot(
height=600,
show_copy_button=True,
placeholder="Select a model and begin chatting. Now supports multiple inference providers, multimodal inputs, and MCP servers",
layout="panel"
)
logger.info("Chatbot interface created.")
# Multimodal textbox for messages (combines text and file uploads)
msg = gr.MultimodalTextbox(
placeholder="Type a message or upload images...",
show_label=False,
container=False,
scale=12,
file_types=["image"],
file_count="multiple",
sources=["upload"]
)
# Create accordion for settings
with gr.Accordion("Settings", open=False):
# System message
system_message_box = gr.Textbox(
value="You are a helpful AI assistant that can understand images and text.",
placeholder="You are a helpful assistant.",
label="System Prompt"
)
# Generation parameters
with gr.Row():
with gr.Column():
max_tokens_slider = gr.Slider(
minimum=1,
maximum=4096,
value=512,
step=1,
label="Max tokens"
)
temperature_slider = gr.Slider(
minimum=0.1,
maximum=4.0,
value=0.7,
step=0.1,
label="Temperature"
)
top_p_slider = gr.Slider(
minimum=0.1,
maximum=1.0,
value=0.95,
step=0.05,
label="Top-P"
)
with gr.Column():
frequency_penalty_slider = gr.Slider(
minimum=-2.0,
maximum=2.0,
value=0.0,
step=0.1,
label="Frequency Penalty"
)
seed_slider = gr.Slider(
minimum=-1,
maximum=65535,
value=-1,
step=1,
label="Seed (-1 for random)"
)
# Provider selection
providers_list = [
"hf-inference", # Default Hugging Face Inference
"cerebras", # Cerebras provider
"together", # Together AI
"sambanova", # SambaNova
"novita", # Novita AI
"cohere", # Cohere
"fireworks-ai", # Fireworks AI
"hyperbolic", # Hyperbolic
"nebius", # Nebius
]
provider_radio = gr.Radio(
choices=providers_list,
value="hf-inference",
label="Inference Provider",
)
# New BYOK textbox
byok_textbox = gr.Textbox(
value="",
label="BYOK (Bring Your Own Key)",
info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.",
placeholder="Enter your Hugging Face API token",
type="password" # Hide the API key for security
)
# Custom model box
custom_model_box = gr.Textbox(
value="",
label="Custom Model",
info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.",
placeholder="meta-llama/Llama-3.3-70B-Instruct"
)
# Model search
model_search_box = gr.Textbox(
label="Filter Models",
placeholder="Search for a featured model...",
lines=1
)
# Featured models list
# Updated to include multimodal models
models_list = [
"meta-llama/Llama-3.2-11B-Vision-Instruct",
"meta-llama/Llama-3.3-70B-Instruct",
"meta-llama/Llama-3.1-70B-Instruct",
"meta-llama/Llama-3.0-70B-Instruct",
"meta-llama/Llama-3.2-3B-Instruct",
"meta-llama/Llama-3.2-1B-Instruct",
"meta-llama/Llama-3.1-8B-Instruct",
"NousResearch/Hermes-3-Llama-3.1-8B",
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO",
"mistralai/Mistral-Nemo-Instruct-2407",
"mistralai/Mixtral-8x7B-Instruct-v0.1",
"mistralai/Mistral-7B-Instruct-v0.3",
"mistralai/Mistral-7B-Instruct-v0.2",
"Qwen/Qwen3-235B-A22B",
"Qwen/Qwen3-32B",
"Qwen/Qwen2.5-72B-Instruct",
"Qwen/Qwen2.5-3B-Instruct",
"Qwen/Qwen2.5-0.5B-Instruct",
"Qwen/QwQ-32B",
"Qwen/Qwen2.5-Coder-32B-Instruct",
"microsoft/Phi-3.5-mini-instruct",
"microsoft/Phi-3-mini-128k-instruct",
"microsoft/Phi-3-mini-4k-instruct",
]
featured_model_radio = gr.Radio(
label="Select a model below",
choices=models_list,
value="meta-llama/Llama-3.2-11B-Vision-Instruct", # Default to a multimodal model
interactive=True
)
gr.Markdown("[View all Text-to-Text models](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending) | [View all multimodal models](https://huggingface.co/models?inference_provider=all&pipeline_tag=image-text-to-text&sort=trending)")
# MCP TTS integration
with gr.Accordion("MCP Integration", open=False):
gr.Markdown("## Model Context Protocol (MCP) Integration")
gr.Markdown("Connect to MCP servers to extend functionality.")
tts_enabled = gr.Checkbox(
label="Enable Text-to-Speech",
value=False,
info="When enabled, responses will be converted to speech using the selected MCP TTS server"
)
# Create dropdown for available MCP servers
available_servers = list_mcp_servers()
tts_server = gr.Dropdown(
label="TTS Server",
choices=available_servers,
value=available_servers[0] if available_servers else None,
interactive=True,
visible=len(available_servers) > 0
)
# If no servers configured, show a message
if not available_servers:
gr.Markdown("""
No MCP servers configured. Add them using the MCP_CONFIG environment variable:
```json
{
"kokoroTTS": {
"url": "https://your-kokoro-tts-server/gradio_api/mcp/sse"
}
}
```
""")
# Chat history state
chat_history = gr.State([])
# Function to filter models
def filter_models(search_term):
logger.info(f"Filtering models with search term: {search_term}")
filtered = [m for m in models_list if search_term.lower() in m.lower()]
logger.info(f"Filtered models: {filtered}")
return gr.update(choices=filtered)
# Function to set custom model from radio
def set_custom_model_from_radio(selected):
logger.info(f"Featured model selected: {selected}")
return selected
# Function for the chat interface
def user(user_message, history):
# Debug logging for troubleshooting
logger.info(f"User message received: {user_message}")
# Skip if message is empty (no text and no files)
if not user_message or (not user_message.get("text") and not user_message.get("files")):
logger.info("Empty message, skipping")
return history
# Prepare multimodal message format
text_content = user_message.get("text", "").strip()
files = user_message.get("files", [])
logger.info(f"Text content: {text_content}")
logger.info(f"Files: {files}")
# If both text and files are empty, skip
if not text_content and not files:
logger.info("No content to display")
return history
# Add message with images to history
if files and len(files) > 0:
# Add text message first if it exists
if text_content:
# Add a separate text message
logger.info(f"Adding text message: {text_content}")
history.append([text_content, None])
# Then add each image file separately
for file_path in files:
if file_path and isinstance(file_path, str):
logger.info(f"Adding image: {file_path}")
# Add image as a separate message with no text
history.append([f"![Image]({file_path})", None])
return history
else:
# For text-only messages
logger.info(f"Adding text-only message: {text_content}")
history.append([text_content, None])
return history
# Define bot response function
def bot(history, system_msg, max_tokens, temperature, top_p, freq_penalty, seed, provider, api_key, custom_model, search_term, selected_model, tts_enabled, tts_server):
# Check if history is valid
if not history or len(history) == 0:
logger.info("No history to process")
return history
# Get the most recent message and detect if it's an image
user_message = history[-1][0]
logger.info(f"Processing user message: {user_message}")
is_image = False
image_path = None
text_content = user_message
# Check if this is an image message (marked with ![Image])
if isinstance(user_message, str) and user_message.startswith("![Image]("):
is_image = True
# Extract image path from markdown format ![Image](path)
image_path = user_message.replace("![Image](", "").replace(")", "")
logger.info(f"Image detected: {image_path}")
text_content = "" # No text for image-only messages
# Look back for text context if this is an image
text_context = ""
if is_image and len(history) > 1:
# Use the previous message as context if it's text
prev_message = history[-2][0]
if isinstance(prev_message, str) and not prev_message.startswith("![Image]("):
text_context = prev_message
logger.info(f"Using text context from previous message: {text_context}")
# Process message through respond function
history[-1][1] = ""
# Use either the image or text for the API
if is_image:
# For image messages
for response in respond(
text_context, # Text context from previous message if any
[image_path], # Current image
history[:-1], # Previous history
system_msg,
max_tokens,
temperature,
top_p,
freq_penalty,
seed,
provider,
api_key,
custom_model,
search_term,
selected_model,
tts_enabled,
tts_server
):
history[-1][1] = response
yield history
else:
# For text-only messages
for response in respond(
text_content, # Text message
None, # No image
history[:-1], # Previous history
system_msg,
max_tokens,
temperature,
top_p,
freq_penalty,
seed,
provider,
api_key,
custom_model,
search_term,
selected_model,
tts_enabled,
tts_server
):
history[-1][1] = response
yield history
# Event handlers - only using the MultimodalTextbox's built-in submit functionality
msg.submit(
user,
[msg, chatbot],
[chatbot],
queue=False
).then(
bot,
[chatbot, system_message_box, max_tokens_slider, temperature_slider, top_p_slider,
frequency_penalty_slider, seed_slider, provider_radio, byok_textbox, custom_model_box,
model_search_box, featured_model_radio, tts_enabled, tts_server],
[chatbot]
).then(
lambda: {"text": "", "files": []}, # Clear inputs after submission
None,
[msg]
)
# Connect the model filter to update the radio choices
model_search_box.change(
fn=filter_models,
inputs=model_search_box,
outputs=featured_model_radio
)
logger.info("Model search box change event linked.")
# Connect the featured model radio to update the custom model box
featured_model_radio.change(
fn=set_custom_model_from_radio,
inputs=featured_model_radio,
outputs=custom_model_box
)
logger.info("Featured model radio button change event linked.")
# Connect the BYOK textbox to validate provider selection
byok_textbox.change(
fn=validate_provider,
inputs=[byok_textbox, provider_radio],
outputs=provider_radio
)
logger.info("BYOK textbox change event linked.")
# Also validate provider when the radio changes to ensure consistency
provider_radio.change(
fn=validate_provider,
inputs=[byok_textbox, provider_radio],
outputs=provider_radio
)
logger.info("Provider radio button change event linked.")
# Update TTS server dropdown visibility based on the TTS toggle
tts_enabled.change(
lambda enabled: gr.update(visible=enabled and len(list_mcp_servers()) > 0),
inputs=tts_enabled,
outputs=tts_server
)
logger.info("Gradio interface initialized.")
if __name__ == "__main__":
logger.info("Launching the demo application.")
demo.launch(show_api=True)