Spaces:
Running
Running
import gradio as gr | |
from huggingface_hub import InferenceClient | |
import os | |
import json | |
import base64 | |
from PIL import Image | |
import io | |
ACCESS_TOKEN = os.getenv("HF_TOKEN") | |
print("Access token loaded.") | |
# Function to encode image to base64 | |
def encode_image(image_path): | |
if not image_path: | |
print("No image path provided") | |
return None | |
try: | |
print(f"Encoding image from path: {image_path}") | |
# If it's already a PIL Image | |
if isinstance(image_path, Image.Image): | |
image = image_path | |
else: | |
# Try to open the image file | |
image = Image.open(image_path) | |
# Convert to RGB if image has an alpha channel (RGBA) | |
if image.mode == 'RGBA': | |
image = image.convert('RGB') | |
# Encode to base64 | |
buffered = io.BytesIO() | |
image.save(buffered, format="JPEG") # Keep JPEG for consistency with image_url | |
img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
print("Image encoded successfully") | |
return img_str | |
except Exception as e: | |
print(f"Error encoding image: {e}") | |
return None | |
def respond( | |
message, | |
image_files, # Changed parameter name and structure | |
history: list[tuple[str, str]], | |
system_message, | |
max_tokens, | |
temperature, | |
top_p, | |
frequency_penalty, | |
seed, | |
provider, | |
custom_api_key, | |
custom_model, | |
model_search_term, # Retained for function signature consistency if called elsewhere | |
selected_model # Retained for function signature consistency | |
): | |
""" | |
Core function to stream responses from a language model. | |
Args: | |
message (str | list): The user's message, can be text or multimodal content. | |
image_files (list[str]): List of paths to image files for the current turn. | |
history (list[tuple[str, str]]): Conversation history. | |
system_message (str): System prompt for the model. | |
max_tokens (int): Maximum tokens for the response. | |
temperature (float): Sampling temperature. | |
top_p (float): Top-p (nucleus) sampling. | |
frequency_penalty (float): Frequency penalty. | |
seed (int): Random seed (-1 for random). | |
provider (str): Inference provider. | |
custom_api_key (str): Custom API key. | |
custom_model (str): Custom model ID. | |
model_search_term (str): Term for searching models (UI related). | |
selected_model (str): Model selected from UI list. | |
Yields: | |
str: The cumulative response from the model. | |
""" | |
print(f"Received message: {message}") | |
print(f"Received {len(image_files) if image_files else 0} images for current turn") | |
print(f"History: {history}") | |
print(f"System message: {system_message}") | |
print(f"Max tokens: {max_tokens}, Temperature: {temperature}, Top-P: {top_p}") | |
print(f"Frequency Penalty: {frequency_penalty}, Seed: {seed}") | |
print(f"Selected provider: {provider}") | |
print(f"Custom API Key provided: {bool(custom_api_key.strip())}") | |
print(f"Selected model (custom_model): {custom_model}") | |
print(f"Model search term: {model_search_term}") | |
print(f"Selected model from radio: {selected_model}") | |
# Determine which token to use | |
token_to_use = custom_api_key if custom_api_key.strip() != "" else ACCESS_TOKEN | |
if custom_api_key.strip() != "": | |
print("USING CUSTOM API KEY: BYOK token provided by user is being used for authentication") | |
else: | |
print("USING DEFAULT API KEY: Environment variable HF_TOKEN is being used for authentication") | |
# Initialize the Inference Client with the provider and appropriate token | |
client = InferenceClient(token=token_to_use, provider=provider) | |
print(f"Hugging Face Inference Client initialized with {provider} provider.") | |
# Convert seed to None if -1 (meaning random) | |
if seed == -1: | |
seed = None | |
# Create multimodal content if images are present for the current message | |
# The 'message' parameter to 'respond' is now the text part of the current turn | |
# 'image_files' parameter to 'respond' now holds image paths for the current turn | |
current_turn_content = [] | |
if message and isinstance(message, str) and message.strip(): | |
current_turn_content.append({ | |
"type": "text", | |
"text": message | |
}) | |
if image_files and len(image_files) > 0: | |
for img_path in image_files: # Iterate through paths in image_files | |
if img_path is not None: | |
try: | |
encoded_image = encode_image(img_path) # img_path is already a path | |
if encoded_image: | |
current_turn_content.append({ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{encoded_image}" | |
} | |
}) | |
except Exception as e: | |
print(f"Error encoding image for current turn: {e}") | |
# If current_turn_content is empty (e.g. only empty text message), use the raw message | |
if not current_turn_content and isinstance(message, str): | |
final_user_content_for_api = message | |
elif not current_turn_content and not isinstance(message, str): # case where message might be complex type but empty | |
final_user_content_for_api = "" # or handle as error | |
else: | |
final_user_content_for_api = current_turn_content | |
# Prepare messages in the format expected by the API | |
messages_for_api = [{"role": "system", "content": system_message}] | |
print("Initial messages array constructed.") | |
# Add conversation history to the context | |
for val in history: # history is list[tuple[str, str]] | |
user_hist_msg_content = val[0] # This is what user typed or image markdown | |
assistant_hist_msg = val[1] | |
# Process user history message (could be text or markdown image path) | |
if user_hist_msg_content: | |
# Check if it's an image markdown from history | |
if isinstance(user_hist_msg_content, str) and user_hist_msg_content.startswith(": | |
hist_img_path = user_hist_msg_content.replace(".replace(")", "") | |
encoded_hist_image = encode_image(hist_img_path) | |
if encoded_hist_image: | |
messages_for_api.append({"role": "user", "content": [ | |
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{encoded_hist_image}"}} | |
]}) | |
else: # if image encoding fails, maybe send a placeholder or skip | |
messages_for_api.append({"role": "user", "content": "[Image could not be loaded]"}) | |
else: # It's a text message from history | |
messages_for_api.append({"role": "user", "content": user_hist_msg_content}) | |
print(f"Added user message to API context from history (type: {type(user_hist_msg_content)})") | |
if assistant_hist_msg: | |
messages_for_api.append({"role": "assistant", "content": assistant_hist_msg}) | |
print(f"Added assistant message to API context from history: {assistant_hist_msg}") | |
# Append the latest user message (which now includes images if any for this turn) | |
messages_for_api.append({"role": "user", "content": final_user_content_for_api}) | |
print(f"Latest user message appended to API context (content type: {type(final_user_content_for_api)})") | |
# Determine which model to use, prioritizing custom_model if provided | |
model_to_use = custom_model.strip() if custom_model.strip() != "" else selected_model | |
print(f"Model selected for inference: {model_to_use}") | |
# Start with an empty string to build the response as tokens stream in | |
response_text = "" | |
print(f"Sending request to {provider} provider.") | |
# Prepare parameters for the chat completion request | |
parameters = { | |
"max_tokens": max_tokens, | |
"temperature": temperature, | |
"top_p": top_p, | |
"frequency_penalty": frequency_penalty, | |
} | |
if seed is not None: | |
parameters["seed"] = seed | |
# Use the InferenceClient for making the request | |
try: | |
# Create a generator for the streaming response | |
stream = client.chat_completion( | |
model=model_to_use, | |
messages=messages_for_api, # Use the correctly formatted messages | |
stream=True, | |
**parameters | |
) | |
print("Received tokens: ", end="", flush=True) | |
# Process the streaming response | |
for chunk in stream: | |
if hasattr(chunk, 'choices') and len(chunk.choices) > 0: | |
# Extract the content from the response | |
if hasattr(chunk.choices[0], 'delta') and hasattr(chunk.choices[0].delta, 'content'): | |
token_text_chunk = chunk.choices[0].delta.content | |
if token_text_chunk: | |
print(token_text_chunk, end="", flush=True) | |
response_text += token_text_chunk | |
yield response_text | |
print() | |
except Exception as e: | |
print(f"Error during inference: {e}") | |
response_text += f"\nError: {str(e)}" | |
yield response_text | |
print("Completed response generation.") | |
# Function to validate provider selection based on BYOK | |
def validate_provider(api_key, provider): | |
if not api_key.strip() and provider != "hf-inference": | |
return gr.update(value="hf-inference") | |
return gr.update(value=provider) | |
# GRADIO UI | |
with gr.Blocks(theme="Nymbo/Nymbo_Theme") as demo: | |
# Create the chatbot component | |
chatbot = gr.Chatbot( | |
height=600, | |
show_copy_button=True, | |
placeholder="Select a model and begin chatting. Now supports multiple inference providers and multimodal inputs", | |
layout="panel" | |
) | |
print("Chatbot interface created.") | |
# Multimodal textbox for messages (combines text and file uploads) | |
msg = gr.MultimodalTextbox( | |
placeholder="Type a message or upload images...", | |
show_label=False, | |
container=False, | |
scale=12, | |
file_types=["image"], | |
file_count="multiple", | |
sources=["upload"] | |
) | |
# Create accordion for settings | |
with gr.Accordion("Settings", open=False): | |
# System message | |
system_message_box = gr.Textbox( | |
value="You are a helpful AI assistant that can understand images and text.", | |
placeholder="You are a helpful assistant.", | |
label="System Prompt" | |
) | |
# Generation parameters | |
with gr.Row(): | |
with gr.Column(): | |
max_tokens_slider = gr.Slider( | |
minimum=1, | |
maximum=4096, | |
value=512, | |
step=1, | |
label="Max tokens" | |
) | |
temperature_slider = gr.Slider( | |
minimum=0.1, | |
maximum=4.0, | |
value=0.7, | |
step=0.1, | |
label="Temperature" | |
) | |
top_p_slider = gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
label="Top-P" | |
) | |
with gr.Column(): | |
frequency_penalty_slider = gr.Slider( | |
minimum=-2.0, | |
maximum=2.0, | |
value=0.0, | |
step=0.1, | |
label="Frequency Penalty" | |
) | |
seed_slider = gr.Slider( | |
minimum=-1, | |
maximum=65535, | |
value=-1, | |
step=1, | |
label="Seed (-1 for random)" | |
) | |
# Provider selection | |
providers_list = [ | |
"hf-inference", "cerebras", "together", "sambanova", | |
"novita", "cohere", "fireworks-ai", "hyperbolic", "nebius", | |
] | |
provider_radio = gr.Radio( | |
choices=providers_list, value="hf-inference", label="Inference Provider", | |
) | |
byok_textbox = gr.Textbox( | |
value="", label="BYOK (Bring Your Own Key)", | |
info="Enter a custom Hugging Face API key here. When empty, only 'hf-inference' provider can be used.", | |
placeholder="Enter your Hugging Face API token", type="password" | |
) | |
custom_model_box = gr.Textbox( | |
value="", label="Custom Model", | |
info="(Optional) Provide a custom Hugging Face model path. Overrides any selected featured model.", | |
placeholder="meta-llama/Llama-3.3-70B-Instruct" | |
) | |
model_search_box = gr.Textbox( | |
label="Filter Models", placeholder="Search for a featured model...", lines=1 | |
) | |
models_list = [ | |
"meta-llama/Llama-3.2-11B-Vision-Instruct", "meta-llama/Llama-3.3-70B-Instruct", | |
"meta-llama/Llama-3.1-70B-Instruct", "meta-llama/Llama-3.0-70B-Instruct", | |
"meta-llama/Llama-3.2-3B-Instruct", "meta-llama/Llama-3.2-1B-Instruct", | |
"meta-llama/Llama-3.1-8B-Instruct", "NousResearch/Hermes-3-Llama-3.1-8B", | |
"NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", "mistralai/Mistral-Nemo-Instruct-2407", | |
"mistralai/Mixtral-8x7B-Instruct-v0.1", "mistralai/Mistral-7B-Instruct-v0.3", | |
"mistralai/Mistral-7B-Instruct-v0.2", "Qwen/Qwen3-235B-A22B", "Qwen/Qwen3-32B", | |
"Qwen/Qwen2.5-72B-Instruct", "Qwen/Qwen2.5-3B-Instruct", "Qwen/Qwen2.5-0.5B-Instruct", | |
"Qwen/QwQ-32B", "Qwen/Qwen2.5-Coder-32B-Instruct", "microsoft/Phi-3.5-mini-instruct", | |
"microsoft/Phi-3-mini-128k-instruct", "microsoft/Phi-3-mini-4k-instruct", | |
] | |
featured_model_radio = gr.Radio( | |
label="Select a model below", choices=models_list, | |
value="meta-llama/Llama-3.2-11B-Vision-Instruct", interactive=True | |
) | |
gr.Markdown("[View all Text-to-Text models](https://huggingface.co/models?inference_provider=all&pipeline_tag=text-generation&sort=trending) | [View all multimodal models](https://huggingface.co/models?inference_provider=all&pipeline_tag=image-text-to-text&sort=trending)") | |
# MCP Support Information | |
with gr.Accordion("MCP Support (for AI Tool Use)", open=False): | |
gr.Markdown(""" | |
### MCP (Model Context Protocol) Enabled | |
This application's text and image generation capability can be used as a tool by MCP-compatible AI models | |
(e.g., certain versions of Claude, Cursor, or custom MCP clients like Tiny Agents). | |
The primary interaction function (`bot`) is exposed as an MCP tool. | |
Provide the conversation history and other parameters as arguments to the tool. | |
For multimodal input, ensure the history correctly references image data that the server can access | |
(Gradio's MCP layer may handle base64 to file conversion if the tool schema indicates file inputs). | |
**MCP Server URL:** | |
`https://YOUR_SPACE_NAME-serverless-textgen-hub.hf.space/gradio_api/mcp/sse` | |
*(Replace `YOUR_SPACE_NAME` with your Hugging Face username or organization if this is a user space, | |
or the full space name if different. You can find this URL in your browser once the Space is running.)* | |
**Example MCP Client Configuration (`mcp.json` style):** | |
```json | |
{ | |
"servers": [ | |
{ | |
"name": "ServerlessTextGenHubTool", | |
"transport": { | |
"type": "sse", | |
"url": "https://YOUR_SPACE_NAME-serverless-textgen-hub.hf.space/gradio_api/mcp/sse" | |
} | |
} | |
] | |
} | |
``` | |
**Note on Tool Schema:** The exact schema of the MCP tool will be determined by Gradio based on the `bot` function's | |
signature (including type hints) and the Gradio components it interacts with. | |
Refer to the `/gradio_api/mcp/schema` endpoint of your running application for the precise tool definition. | |
For image inputs via MCP, clients should ideally send image URLs or base64 encoded data if the tool's schema supports file types. | |
Gradio's MCP layer attempts to handle file data conversions. | |
""") | |
# Chat history state | |
chat_history = gr.State([]) # Not directly used, chatbot component handles its state internally | |
# Function to filter models | |
def filter_models(search_term: str): | |
print(f"Filtering models with search term: {search_term}") | |
filtered = [m for m in models_list if search_term.lower() in m.lower()] | |
print(f"Filtered models: {filtered}") | |
return gr.update(choices=filtered if filtered else models_list, value=featured_model_radio.value if filtered and featured_model_radio.value in filtered else (filtered[0] if filtered else models_list[0])) | |
# Function to set custom model from radio | |
def set_custom_model_from_radio(selected: str): | |
print(f"Featured model selected: {selected}") | |
# This function now directly returns the selected model to update custom_model_box | |
# If custom_model_box is meant to override, this keeps them in sync until user types in custom_model_box | |
return selected | |
# Function for the chat interface (user's turn) | |
def user(user_message_input: dict, history: list[list[str | None]]): | |
print(f"User input (raw from MultimodalTextbox): {user_message_input}") | |
text_content = user_message_input.get("text", "").strip() | |
files = user_message_input.get("files", []) # List of temp file paths | |
print(f"Parsed text content: '{text_content}'") | |
print(f"Parsed files: {files}") | |
# Append text message to history if present | |
if text_content: | |
history.append([text_content, None]) | |
print(f"Appended text to history: {text_content}") | |
# Append image messages to history | |
if files: | |
for file_path in files: | |
if file_path and isinstance(file_path, str): # file_path is a temp path from Gradio | |
# Embed image as markdown link in history for display | |
# The actual file path is used by `respond` via `bot` | |
history.append([f"", None]) | |
print(f"Appended image to history: {file_path}") | |
# If neither text nor files, don't add an empty turn | |
if not text_content and not files: | |
print("Empty input, no change to history.") | |
return history # Return current history as is | |
return history | |
# Define bot response function | |
def bot( | |
history: list[list[str | None]], # Type hint for history | |
system_msg: str, | |
max_tokens: int, | |
temperature: float, | |
top_p: float, | |
freq_penalty: float, | |
seed: int, | |
provider: str, | |
api_key: str, | |
custom_model: str, | |
# model_search_term: str, # This argument comes from model_search_box | |
selected_model: str # This argument comes from featured_model_radio | |
): | |
""" | |
Processes user input from the chat history, calls the language model via the 'respond' | |
function, and streams the bot's response back to update the chat history. | |
This function is intended to be exposed as an MCP tool. | |
Args: | |
history (list[list[str | None]]): The conversation history. | |
Each item is [user_message, bot_message]. | |
User messages can be text or markdown image paths like "". | |
system_msg (str): The system prompt. | |
max_tokens (int): Maximum number of tokens to generate. | |
temperature (float): Sampling temperature for generation. | |
top_p (float): Top-P (nucleus) sampling probability. | |
freq_penalty (float): Frequency penalty for generation. | |
seed (int): Random seed for generation (-1 for random). | |
provider (str): The inference provider to use. | |
api_key (str): Custom API key, if provided by the user. | |
custom_model (str): Custom model path/ID. If empty, selected_model is used. | |
selected_model (str): The model selected from the featured list. | |
Yields: | |
list[list[str | None]]: The updated chat history with the bot's streaming response. | |
""" | |
print(f"Bot function called. History: {history}") | |
if not history or history[-1][0] is None: # Check if last user message is None | |
print("No user message in the last history turn to process.") | |
# yield history # removed to avoid issues with Gradio expecting a specific sequence | |
return # Or raise an error, or handle appropriately | |
# The last user message is history[-1][0] | |
# The bot's response will go into history[-1][1] | |
user_turn_content = history[-1][0] | |
current_turn_text_message = "" | |
current_turn_image_paths = [] | |
# Check if the last user message in history is an image markdown | |
if isinstance(user_turn_content, str) and user_turn_content.startswith(": | |
# This is an image message | |
img_path = user_turn_content.replace(".replace(")", "") | |
current_turn_image_paths.append(img_path) | |
# Check if there was a text message immediately preceding this image in the same "turn" | |
# This requires looking at how `user` function structures history. | |
# `user` adds text and images as separate entries in history. | |
# So, if history[-1][0] is an image, history[-2][0] might be related text IF it was part of the same multimodal input. | |
# This logic becomes complex. Simpler: assume each history entry is distinct. | |
# For MCP, it's better if the client structures the call to `bot` clearly. | |
print(f"Processing image from history: {img_path}") | |
elif isinstance(user_turn_content, str): | |
# This is a text message | |
current_turn_text_message = user_turn_content | |
print(f"Processing text from history: {current_turn_text_message}") | |
else: | |
print(f"Unexpected content in history user turn: {user_turn_content}") | |
# yield history # removed | |
return | |
history[-1][1] = "" # Initialize bot response field for the current turn | |
# Call the 'respond' function. | |
# History for 'respond' should be prior turns, not including the current user message being processed. | |
history_for_respond = history[:-1] | |
for response_chunk in respond( | |
message=current_turn_text_message, # Text part of current turn | |
image_files=current_turn_image_paths, # Image paths of current turn | |
history=history_for_respond, # History up to the previous turn | |
system_message=system_msg, | |
max_tokens=max_tokens, | |
temperature=temperature, | |
top_p=top_p, | |
frequency_penalty=freq_penalty, | |
seed=seed, | |
provider=provider, | |
custom_api_key=api_key, | |
custom_model=custom_model, | |
model_search_term="", # Not directly used by respond's core logic here | |
selected_model=selected_model | |
): | |
history[-1][1] = response_chunk # Update bot response in the current turn | |
yield history | |
# Event handlers | |
# The parameters to `bot` must match the order of inputs list | |
msg.submit( | |
user, | |
[msg, chatbot], | |
[chatbot], | |
queue=False | |
).then( | |
bot, | |
[chatbot, system_message_box, max_tokens_slider, temperature_slider, top_p_slider, | |
frequency_penalty_slider, seed_slider, provider_radio, byok_textbox, custom_model_box, | |
# model_search_box, # Removed from bot inputs as it's UI only | |
featured_model_radio], | |
[chatbot] | |
).then( | |
lambda: {"text": "", "files": []}, | |
None, | |
[msg] | |
) | |
model_search_box.change( | |
fn=filter_models, inputs=model_search_box, outputs=featured_model_radio | |
) | |
print("Model search box change event linked.") | |
featured_model_radio.change( | |
fn=set_custom_model_from_radio, inputs=featured_model_radio, outputs=custom_model_box | |
) | |
print("Featured model radio button change event linked.") | |
byok_textbox.change( | |
fn=validate_provider, inputs=[byok_textbox, provider_radio], outputs=provider_radio | |
) | |
print("BYOK textbox change event linked.") | |
provider_radio.change( | |
fn=validate_provider, inputs=[byok_textbox, provider_radio], outputs=provider_radio | |
) | |
print("Provider radio button change event linked.") | |
print("Gradio interface initialized.") | |
if __name__ == "__main__": | |
print("Launching the demo application.") | |
# Added mcp_server=True | |
demo.launch(show_api=True, mcp_server=True) |