|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | from __future__ import annotations | 
					
						
						|  |  | 
					
						
						|  | import re | 
					
						
						|  | import json | 
					
						
						|  | import sys | 
					
						
						|  | import os | 
					
						
						|  | import random | 
					
						
						|  | from io import StringIO | 
					
						
						|  | from typing import List, Dict, Tuple, Annotated, Literal, Optional | 
					
						
						|  |  | 
					
						
						|  | import gradio as gr | 
					
						
						|  | import requests | 
					
						
						|  | from bs4 import BeautifulSoup | 
					
						
						|  | from markdownify import markdownify as md | 
					
						
						|  | from readability import Document | 
					
						
						|  | from urllib.parse import urlparse | 
					
						
						|  | from ddgs import DDGS | 
					
						
						|  | from PIL import Image | 
					
						
						|  | from huggingface_hub import InferenceClient | 
					
						
						|  | import time | 
					
						
						|  | import tempfile | 
					
						
						|  | import uuid | 
					
						
						|  | import threading | 
					
						
						|  | from datetime import datetime | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import numpy as np | 
					
						
						|  | try: | 
					
						
						|  | import torch | 
					
						
						|  | except Exception: | 
					
						
						|  | torch = None | 
					
						
						|  | try: | 
					
						
						|  | from kokoro import KModel, KPipeline | 
					
						
						|  | except Exception: | 
					
						
						|  | KModel = None | 
					
						
						|  | KPipeline = None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _http_get_enhanced(url: str) -> requests.Response: | 
					
						
						|  | """ | 
					
						
						|  | Download the page with enhanced headers, timeout handling, and better error recovery. | 
					
						
						|  | """ | 
					
						
						|  | headers = { | 
					
						
						|  | "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | 
					
						
						|  | "Accept-Language": "en-US,en;q=0.9", | 
					
						
						|  | "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", | 
					
						
						|  | "Accept-Encoding": "gzip, deflate, br", | 
					
						
						|  | "DNT": "1", | 
					
						
						|  | "Connection": "keep-alive", | 
					
						
						|  | "Upgrade-Insecure-Requests": "1", | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | _fetch_rate_limiter.acquire() | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | response = requests.get( | 
					
						
						|  | url, | 
					
						
						|  | headers=headers, | 
					
						
						|  | timeout=30, | 
					
						
						|  | allow_redirects=True, | 
					
						
						|  | stream=False | 
					
						
						|  | ) | 
					
						
						|  | response.raise_for_status() | 
					
						
						|  | return response | 
					
						
						|  | except requests.exceptions.Timeout: | 
					
						
						|  | raise requests.exceptions.RequestException("Request timed out. The webpage took too long to respond.") | 
					
						
						|  | except requests.exceptions.ConnectionError: | 
					
						
						|  | raise requests.exceptions.RequestException("Connection error. Please check the URL and your internet connection.") | 
					
						
						|  | except requests.exceptions.HTTPError as e: | 
					
						
						|  | if response.status_code == 403: | 
					
						
						|  | raise requests.exceptions.RequestException("Access forbidden. The website may be blocking automated requests.") | 
					
						
						|  | elif response.status_code == 404: | 
					
						
						|  | raise requests.exceptions.RequestException("Page not found. Please check the URL.") | 
					
						
						|  | elif response.status_code == 429: | 
					
						
						|  | raise requests.exceptions.RequestException("Rate limited. Please try again in a few minutes.") | 
					
						
						|  | else: | 
					
						
						|  | raise requests.exceptions.RequestException(f"HTTP error {response.status_code}: {str(e)}") | 
					
						
						|  |  | 
					
						
						|  | def _normalize_whitespace(text: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Squeeze extra spaces and blank lines to keep things compact. | 
					
						
						|  | (Layman's terms: tidy up the text so it’s not full of weird spacing.) | 
					
						
						|  | """ | 
					
						
						|  | text = re.sub(r"[ \t\u00A0]+", " ", text) | 
					
						
						|  | text = re.sub(r"\n\s*\n\s*\n+", "\n\n", text.strip()) | 
					
						
						|  | return text.strip() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _truncate(text: str, max_chars: int) -> Tuple[str, bool]: | 
					
						
						|  | """ | 
					
						
						|  | Cut text if it gets too long; return the text and whether we trimmed. | 
					
						
						|  | (Layman's terms: shorten long text and tell us if we had to cut it.) | 
					
						
						|  | """ | 
					
						
						|  | if max_chars is None or max_chars <= 0 or len(text) <= max_chars: | 
					
						
						|  | return text, False | 
					
						
						|  | return text[:max_chars].rstrip() + " …", True | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _shorten(text: str, limit: int) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Hard cap a string with an ellipsis to keep tokens small. | 
					
						
						|  | (Layman's terms: force a string to a max length with an ellipsis.) | 
					
						
						|  | """ | 
					
						
						|  | if limit <= 0 or len(text) <= limit: | 
					
						
						|  | return text | 
					
						
						|  | return text[: max(0, limit - 1)].rstrip() + "…" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _domain_of(url: str) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Show a friendly site name like "example.com". | 
					
						
						|  | (Layman's terms: pull the website's domain.) | 
					
						
						|  | """ | 
					
						
						|  | try: | 
					
						
						|  | return urlparse(url).netloc or "" | 
					
						
						|  | except Exception: | 
					
						
						|  | return "" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _meta(soup: BeautifulSoup, name: str) -> str | None: | 
					
						
						|  | tag = soup.find("meta", attrs={"name": name}) | 
					
						
						|  | return tag.get("content") if tag and tag.has_attr("content") else None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _og(soup: BeautifulSoup, prop: str) -> str | None: | 
					
						
						|  | tag = soup.find("meta", attrs={"property": prop}) | 
					
						
						|  | return tag.get("content") if tag and tag.has_attr("content") else None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _extract_metadata(soup: BeautifulSoup, final_url: str) -> Dict[str, str]: | 
					
						
						|  | """ | 
					
						
						|  | Pull the useful bits: title, description, site name, canonical URL, language, etc. | 
					
						
						|  | (Layman's terms: gather page basics like title/description/address.) | 
					
						
						|  | """ | 
					
						
						|  | meta: Dict[str, str] = {} | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | title_candidates = [ | 
					
						
						|  | (soup.title.string if soup.title and soup.title.string else None), | 
					
						
						|  | _og(soup, "og:title"), | 
					
						
						|  | _meta(soup, "twitter:title"), | 
					
						
						|  | ] | 
					
						
						|  | meta["title"] = next((t.strip() for t in title_candidates if t and t.strip()), "") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | desc_candidates = [ | 
					
						
						|  | _meta(soup, "description"), | 
					
						
						|  | _og(soup, "og:description"), | 
					
						
						|  | _meta(soup, "twitter:description"), | 
					
						
						|  | ] | 
					
						
						|  | meta["description"] = next((d.strip() for d in desc_candidates if d and d.strip()), "") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | link_canonical = soup.find("link", rel=lambda v: v and "canonical" in v) | 
					
						
						|  | meta["canonical"] = (link_canonical.get("href") or "").strip() if link_canonical else "" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | meta["site_name"] = (_og(soup, "og:site_name") or "").strip() | 
					
						
						|  | html_tag = soup.find("html") | 
					
						
						|  | meta["lang"] = (html_tag.get("lang") or "").strip() if html_tag else "" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | meta["fetched_url"] = final_url | 
					
						
						|  | meta["domain"] = _domain_of(final_url) | 
					
						
						|  |  | 
					
						
						|  | return meta | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _extract_main_text(html: str) -> Tuple[str, BeautifulSoup]: | 
					
						
						|  | """ | 
					
						
						|  | Use Readability to isolate the main article and turn it into clean text. | 
					
						
						|  | Returns (clean_text, soup_of_readable_html). | 
					
						
						|  | (Layman's terms: find the real article text and clean it.) | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  | doc = Document(html) | 
					
						
						|  | readable_html = doc.summary(html_partial=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | s = BeautifulSoup(readable_html, "lxml") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for sel in ["script", "style", "noscript", "iframe", "svg"]: | 
					
						
						|  | for tag in s.select(sel): | 
					
						
						|  | tag.decompose() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | text_parts: List[str] = [] | 
					
						
						|  | for p in s.find_all(["p", "li", "h2", "h3", "h4", "blockquote"]): | 
					
						
						|  | chunk = p.get_text(" ", strip=True) | 
					
						
						|  | if chunk: | 
					
						
						|  | text_parts.append(chunk) | 
					
						
						|  |  | 
					
						
						|  | clean_text = _normalize_whitespace("\n\n".join(text_parts)) | 
					
						
						|  | return clean_text, s | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _fullpage_markdown_from_soup(full_soup: BeautifulSoup, base_url: str) -> str: | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for element in full_soup.select("script, style, nav, footer, header, aside"): | 
					
						
						|  | element.decompose() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | main = ( | 
					
						
						|  | full_soup.find("main") | 
					
						
						|  | or full_soup.find("article") | 
					
						
						|  | or full_soup.find("div", class_=re.compile(r"content|main|post|article", re.I)) | 
					
						
						|  | or full_soup.find("body") | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | if not main: | 
					
						
						|  | return "No main content found on the webpage." | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | markdown_text = md(str(main), heading_style="ATX") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | markdown_text = re.sub(r"\n{3,}", "\n\n", markdown_text) | 
					
						
						|  | markdown_text = re.sub(r"\[\s*\]\([^)]*\)", "", markdown_text) | 
					
						
						|  | markdown_text = re.sub(r"[ \t]+", " ", markdown_text) | 
					
						
						|  | markdown_text = markdown_text.strip() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | title = full_soup.find("title") | 
					
						
						|  | if title and title.get_text(strip=True): | 
					
						
						|  | markdown_text = f"# {title.get_text(strip=True)}\n\n{markdown_text}" | 
					
						
						|  |  | 
					
						
						|  | return markdown_text or "No content could be extracted." | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _truncate_markdown(markdown: str, max_chars: int) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Truncate markdown content to a maximum character count while preserving structure. | 
					
						
						|  | Tries to break at paragraph boundaries when possible. | 
					
						
						|  | """ | 
					
						
						|  | if len(markdown) <= max_chars: | 
					
						
						|  | return markdown | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | truncated = markdown[:max_chars] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | last_paragraph = truncated.rfind('\n\n') | 
					
						
						|  | if last_paragraph > max_chars * 0.7: | 
					
						
						|  | truncated = truncated[:last_paragraph] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | elif '.' in truncated[-100:]: | 
					
						
						|  | last_period = truncated.rfind('.') | 
					
						
						|  | if last_period > max_chars * 0.8: | 
					
						
						|  | truncated = truncated[:last_period + 1] | 
					
						
						|  |  | 
					
						
						|  | return truncated.rstrip() + "\n\n> *[Content truncated for brevity]*" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def Fetch_Webpage( | 
					
						
						|  | url: Annotated[str, "The absolute URL to fetch (must return HTML)."], | 
					
						
						|  | verbosity: Annotated[str, "Controls output length: 'Brief' (1000 chars), 'Standard' (3000 chars), or 'Full' (complete page)."] = "Standard", | 
					
						
						|  | ) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Fetch a web page and return it converted to Markdown format with configurable length. | 
					
						
						|  |  | 
					
						
						|  | This function retrieves a webpage and converts its main content to clean Markdown, | 
					
						
						|  | preserving headings, formatting, and structure. It automatically removes navigation, | 
					
						
						|  | footers, scripts, and other non-content elements to focus on the main article or | 
					
						
						|  | content area. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | url (str): The absolute URL to fetch (must return HTML). | 
					
						
						|  | verbosity (str): Controls output length: | 
					
						
						|  | - "Brief": Truncate to 1000 characters for quick summaries | 
					
						
						|  | - "Standard": Truncate to 3000 characters for balanced content | 
					
						
						|  | - "Full": Return complete page content with no length limit | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: The webpage content converted to Markdown format with: | 
					
						
						|  | - Page title as H1 header | 
					
						
						|  | - Main content converted to clean Markdown | 
					
						
						|  | - Preserved heading hierarchy | 
					
						
						|  | - Clean formatting without navigation/sidebar elements | 
					
						
						|  | - Length controlled by verbosity setting | 
					
						
						|  | """ | 
					
						
						|  | _log_call_start("Fetch_Webpage", url=url, verbosity=verbosity) | 
					
						
						|  | if not url or not url.strip(): | 
					
						
						|  | result = "Please enter a valid URL." | 
					
						
						|  | _log_call_end("Fetch_Webpage", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | resp = _http_get_enhanced(url) | 
					
						
						|  | resp.raise_for_status() | 
					
						
						|  | except requests.exceptions.RequestException as e: | 
					
						
						|  | result = f"An error occurred: {e}" | 
					
						
						|  | _log_call_end("Fetch_Webpage", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | final_url = str(resp.url) | 
					
						
						|  | ctype = resp.headers.get("Content-Type", "") | 
					
						
						|  | if "html" not in ctype.lower(): | 
					
						
						|  | result = f"Unsupported content type for extraction: {ctype or 'unknown'}" | 
					
						
						|  | _log_call_end("Fetch_Webpage", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | resp.encoding = resp.encoding or resp.apparent_encoding | 
					
						
						|  | html = resp.text | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | full_soup = BeautifulSoup(html, "lxml") | 
					
						
						|  | markdown_content = _fullpage_markdown_from_soup(full_soup, final_url) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if verbosity == "Brief": | 
					
						
						|  | result = _truncate_markdown(markdown_content, 1000) | 
					
						
						|  | elif verbosity == "Standard": | 
					
						
						|  | result = _truncate_markdown(markdown_content, 3000) | 
					
						
						|  | else: | 
					
						
						|  | result = markdown_content | 
					
						
						|  | _log_call_end("Fetch_Webpage", f"markdown_chars={len(result)}") | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | import asyncio | 
					
						
						|  | from datetime import datetime, timedelta | 
					
						
						|  |  | 
					
						
						|  | class RateLimiter: | 
					
						
						|  | def __init__(self, requests_per_minute: int = 30): | 
					
						
						|  | self.requests_per_minute = requests_per_minute | 
					
						
						|  | self.requests = [] | 
					
						
						|  |  | 
					
						
						|  | def acquire(self): | 
					
						
						|  | """Synchronous rate limiting for non-async context""" | 
					
						
						|  | now = datetime.now() | 
					
						
						|  |  | 
					
						
						|  | self.requests = [ | 
					
						
						|  | req for req in self.requests if now - req < timedelta(minutes=1) | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  | if len(self.requests) >= self.requests_per_minute: | 
					
						
						|  |  | 
					
						
						|  | wait_time = 60 - (now - self.requests[0]).total_seconds() | 
					
						
						|  | if wait_time > 0: | 
					
						
						|  | time.sleep(max(1, wait_time)) | 
					
						
						|  |  | 
					
						
						|  | self.requests.append(now) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | _search_rate_limiter = RateLimiter(requests_per_minute=20) | 
					
						
						|  | _fetch_rate_limiter = RateLimiter(requests_per_minute=25) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _truncate_for_log(value: str, limit: int = 500) -> str: | 
					
						
						|  | """Truncate long strings for concise terminal logging.""" | 
					
						
						|  | if len(value) <= limit: | 
					
						
						|  | return value | 
					
						
						|  | return value[:limit - 1] + "…" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _serialize_input(val): | 
					
						
						|  | """Best-effort compact serialization of arbitrary input values for logging.""" | 
					
						
						|  | try: | 
					
						
						|  | if isinstance(val, (str, int, float, bool)) or val is None: | 
					
						
						|  | return val | 
					
						
						|  | if isinstance(val, (list, tuple)): | 
					
						
						|  | return [_serialize_input(v) for v in list(val)[:10]] + (["…"] if len(val) > 10 else []) | 
					
						
						|  | if isinstance(val, dict): | 
					
						
						|  | out = {} | 
					
						
						|  | for i, (k, v) in enumerate(val.items()): | 
					
						
						|  | if i >= 12: | 
					
						
						|  | out["…"] = "…" | 
					
						
						|  | break | 
					
						
						|  | out[str(k)] = _serialize_input(v) | 
					
						
						|  | return out | 
					
						
						|  | return repr(val)[:120] | 
					
						
						|  | except Exception: | 
					
						
						|  | return "<unserializable>" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _log_call_start(func_name: str, **kwargs) -> None: | 
					
						
						|  | try: | 
					
						
						|  | compact = {k: _serialize_input(v) for k, v in kwargs.items()} | 
					
						
						|  | print(f"[TOOL CALL] {func_name} inputs: {json.dumps(compact, ensure_ascii=False)[:800]}", flush=True) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"[TOOL CALL] {func_name} (failed to log inputs: {e})", flush=True) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _log_call_end(func_name: str, output_desc: str) -> None: | 
					
						
						|  | try: | 
					
						
						|  | print(f"[TOOL RESULT] {func_name} output: {output_desc}", flush=True) | 
					
						
						|  | except Exception as e: | 
					
						
						|  | print(f"[TOOL RESULT] {func_name} (failed to log output: {e})", flush=True) | 
					
						
						|  |  | 
					
						
						|  | def Search_DuckDuckGo( | 
					
						
						|  | query: Annotated[str, "The search query (supports operators like site:, quotes, OR)."], | 
					
						
						|  | max_results: Annotated[int, "Number of results to return (1–20)."] = 5, | 
					
						
						|  | ) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | query (str): The search query string. Supports operators like site:, quotes for exact matching, | 
					
						
						|  | OR for alternatives, and other DuckDuckGo search syntax. | 
					
						
						|  | Examples: | 
					
						
						|  | - Basic search: "Python programming" | 
					
						
						|  | - Site search: "site:example.com" | 
					
						
						|  | - Exact phrase: "artificial intelligence" | 
					
						
						|  | - Exclude terms: "cats -dogs" | 
					
						
						|  | max_results (int): Number of results to return (1–20). Default: 5. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Search results in readable format with titles, URLs, and snippets as a numbered list. | 
					
						
						|  | """ | 
					
						
						|  | _log_call_start("Search_DuckDuckGo", query=query, max_results=max_results) | 
					
						
						|  | if not query or not query.strip(): | 
					
						
						|  | result = "No search query provided. Please enter a search term." | 
					
						
						|  | _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | max_results = max(1, min(20, max_results)) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | _search_rate_limiter.acquire() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | with DDGS() as ddgs: | 
					
						
						|  | raw = ddgs.text(query, max_results=max_results) | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | error_msg = f"Search failed: {str(e)[:200]}" | 
					
						
						|  | if "blocked" in str(e).lower() or "rate" in str(e).lower(): | 
					
						
						|  | error_msg = "Search temporarily blocked due to rate limiting. Please try again in a few minutes." | 
					
						
						|  | elif "timeout" in str(e).lower(): | 
					
						
						|  | error_msg = "Search timed out. Please try again with a simpler query." | 
					
						
						|  | elif "network" in str(e).lower() or "connection" in str(e).lower(): | 
					
						
						|  | error_msg = "Network connection error. Please check your internet connection and try again." | 
					
						
						|  | result = f"Error: {error_msg}" | 
					
						
						|  | _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | if not raw: | 
					
						
						|  | result = f"No results found for query: {query}" | 
					
						
						|  | _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | results = [] | 
					
						
						|  |  | 
					
						
						|  | for r in raw or []: | 
					
						
						|  | title = (r.get("title") or "").strip() | 
					
						
						|  | url = (r.get("href") or r.get("link") or "").strip() | 
					
						
						|  | body = (r.get("body") or r.get("snippet") or "").strip() | 
					
						
						|  |  | 
					
						
						|  | if not url: | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | result_obj = { | 
					
						
						|  | "title": title or _domain_of(url), | 
					
						
						|  | "url": url, | 
					
						
						|  | "snippet": body | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | results.append(result_obj) | 
					
						
						|  |  | 
					
						
						|  | if not results: | 
					
						
						|  | result = f"No valid results found for query: {query}" | 
					
						
						|  | _log_call_end("Search_DuckDuckGo", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | lines = [f"Found {len(results)} search results for: {query}\n"] | 
					
						
						|  | for i, result in enumerate(results, 1): | 
					
						
						|  | lines.append(f"{i}. {result['title']}") | 
					
						
						|  | lines.append(f"   URL: {result['url']}") | 
					
						
						|  | if result['snippet']: | 
					
						
						|  | lines.append(f"   Summary: {result['snippet']}") | 
					
						
						|  | lines.append("") | 
					
						
						|  | result = "\n".join(lines) | 
					
						
						|  | _log_call_end("Search_DuckDuckGo", f"results={len(results)} chars={len(result)}") | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def Execute_Python(code: Annotated[str, "Python source code to run; stdout is captured and returned."]) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Execute arbitrary Python code and return captured stdout or an error message. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | code (str): Python source code to run; stdout is captured and returned. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Combined stdout produced by the code, or the exception text if | 
					
						
						|  | execution failed. | 
					
						
						|  | """ | 
					
						
						|  | _log_call_start("Execute_Python", code=_truncate_for_log(code or "", 300)) | 
					
						
						|  | if code is None: | 
					
						
						|  | result = "No code provided." | 
					
						
						|  | _log_call_end("Execute_Python", result) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  | old_stdout = sys.stdout | 
					
						
						|  | redirected_output = sys.stdout = StringIO() | 
					
						
						|  | try: | 
					
						
						|  | exec(code) | 
					
						
						|  | result = redirected_output.getvalue() | 
					
						
						|  | except Exception as e: | 
					
						
						|  | result = str(e) | 
					
						
						|  | finally: | 
					
						
						|  | sys.stdout = old_stdout | 
					
						
						|  | _log_call_end("Execute_Python", _truncate_for_log(result)) | 
					
						
						|  | return result | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | _KOKORO_STATE = { | 
					
						
						|  | "initialized": False, | 
					
						
						|  | "device": "cpu", | 
					
						
						|  | "model": None, | 
					
						
						|  | "pipelines": {}, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def get_kokoro_voices(): | 
					
						
						|  | """Get comprehensive list of available Kokoro voice IDs (54 total).""" | 
					
						
						|  | try: | 
					
						
						|  | from huggingface_hub import list_repo_files | 
					
						
						|  |  | 
					
						
						|  | files = list_repo_files('hexgrad/Kokoro-82M') | 
					
						
						|  | voice_files = [f for f in files if f.endswith('.pt') and f.startswith('voices/')] | 
					
						
						|  | voices = [f.replace('voices/', '').replace('.pt', '') for f in voice_files] | 
					
						
						|  | return sorted(voices) if voices else _get_fallback_voices() | 
					
						
						|  | except Exception: | 
					
						
						|  | return _get_fallback_voices() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _get_fallback_voices(): | 
					
						
						|  | """Return comprehensive fallback list of known Kokoro voices (54 total).""" | 
					
						
						|  | return [ | 
					
						
						|  |  | 
					
						
						|  | "af_alloy", "af_aoede", "af_bella", "af_heart", "af_jessica", | 
					
						
						|  | "af_kore", "af_nicole", "af_nova", "af_river", "af_sarah", "af_sky", | 
					
						
						|  |  | 
					
						
						|  | "am_adam", "am_echo", "am_eric", "am_fenrir", "am_liam", | 
					
						
						|  | "am_michael", "am_onyx", "am_puck", "am_santa", | 
					
						
						|  |  | 
					
						
						|  | "bf_alice", "bf_emma", "bf_isabella", "bf_lily", | 
					
						
						|  |  | 
					
						
						|  | "bm_daniel", "bm_fable", "bm_george", "bm_lewis", | 
					
						
						|  |  | 
					
						
						|  | "ef_dora", "em_alex", "em_santa", | 
					
						
						|  |  | 
					
						
						|  | "ff_siwis", | 
					
						
						|  |  | 
					
						
						|  | "hf_alpha", "hf_beta", "hm_omega", "hm_psi", | 
					
						
						|  |  | 
					
						
						|  | "if_sara", "im_nicola", | 
					
						
						|  |  | 
					
						
						|  | "jf_alpha", "jf_gongitsune", "jf_nezumi", "jf_tebukuro", "jm_kumo", | 
					
						
						|  |  | 
					
						
						|  | "pf_dora", "pm_alex", "pm_santa", | 
					
						
						|  |  | 
					
						
						|  | "zf_xiaobei", "zf_xiaoni", "zf_xiaoxiao", "zf_xiaoyi", | 
					
						
						|  | "zm_yunjian", "zm_yunxi", "zm_yunxia", "zm_yunyang" | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _init_kokoro() -> None: | 
					
						
						|  | """Lazy-initialize Kokoro model and pipelines on first use. | 
					
						
						|  |  | 
					
						
						|  | Tries CUDA if torch is present and available; falls back to CPU. Keeps a | 
					
						
						|  | minimal English pipeline and custom lexicon tweak for the word "kokoro". | 
					
						
						|  | """ | 
					
						
						|  | if _KOKORO_STATE["initialized"]: | 
					
						
						|  | return | 
					
						
						|  |  | 
					
						
						|  | if KModel is None or KPipeline is None: | 
					
						
						|  | raise RuntimeError( | 
					
						
						|  | "Kokoro is not installed. Please install the 'kokoro' package (>=0.9.4)." | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | device = "cpu" | 
					
						
						|  | if torch is not None: | 
					
						
						|  | try: | 
					
						
						|  | if torch.cuda.is_available(): | 
					
						
						|  | device = "cuda" | 
					
						
						|  | except Exception: | 
					
						
						|  | device = "cpu" | 
					
						
						|  |  | 
					
						
						|  | model = KModel().to(device).eval() | 
					
						
						|  | pipelines = {"a": KPipeline(lang_code="a", model=False)} | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | pipelines["a"].g2p.lexicon.golds["kokoro"] = "kˈOkəɹO" | 
					
						
						|  | except Exception: | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | _KOKORO_STATE.update( | 
					
						
						|  | { | 
					
						
						|  | "initialized": True, | 
					
						
						|  | "device": device, | 
					
						
						|  | "model": model, | 
					
						
						|  | "pipelines": pipelines, | 
					
						
						|  | } | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def List_Kokoro_Voices() -> List[str]: | 
					
						
						|  | """ | 
					
						
						|  | Get a list of all available Kokoro voice identifiers. | 
					
						
						|  |  | 
					
						
						|  | This MCP tool helps clients discover the 54 available voice options | 
					
						
						|  | for the Generate_Speech tool. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | List[str]: A list of voice identifiers (e.g., ["af_heart", "am_adam", "bf_alice", ...]) | 
					
						
						|  |  | 
					
						
						|  | Voice naming convention: | 
					
						
						|  | - First 2 letters: Language/Region (af=American Female, am=American Male, bf=British Female, etc.) | 
					
						
						|  | - Following letters: Voice name (heart, adam, alice, etc.) | 
					
						
						|  |  | 
					
						
						|  | Available categories: | 
					
						
						|  | - American Female/Male (20 voices) | 
					
						
						|  | - British Female/Male (8 voices) | 
					
						
						|  | - European Female/Male (3 voices) | 
					
						
						|  | - French Female (1 voice) | 
					
						
						|  | - Hindi Female/Male (4 voices) | 
					
						
						|  | - Italian Female/Male (2 voices) | 
					
						
						|  | - Japanese Female/Male (5 voices) | 
					
						
						|  | - Portuguese Female/Male (3 voices) | 
					
						
						|  | - Chinese Female/Male (8 voices) | 
					
						
						|  | """ | 
					
						
						|  | return get_kokoro_voices() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def Generate_Speech( | 
					
						
						|  | text: Annotated[str, "The text to synthesize (English)."], | 
					
						
						|  | speed: Annotated[float, "Speech speed multiplier in 0.5–2.0; 1.0 = normal speed."] = 1.25, | 
					
						
						|  | voice: Annotated[str, "Voice identifier from 54 available options."] = "af_heart", | 
					
						
						|  | ) -> Tuple[int, np.ndarray]: | 
					
						
						|  | """ | 
					
						
						|  | Synthesize speech from text using the Kokoro-82M TTS model. | 
					
						
						|  |  | 
					
						
						|  | This function returns raw audio suitable for a Gradio Audio component and is | 
					
						
						|  | also exposed as an MCP tool. It supports 54 different voices across multiple | 
					
						
						|  | languages and accents including American, British, European, Hindi, Italian, | 
					
						
						|  | Japanese, Portuguese, and Chinese speakers. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | text (str): The text to synthesize. Works best with English but supports multiple languages. | 
					
						
						|  | speed (float): Speech speed multiplier in 0.5–2.0; 1.0 = normal speed. Default: 1.25 (slightly brisk). | 
					
						
						|  | voice (str): Voice identifier from 54 available options. Default: 'af_heart'. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | A tuple of (sample_rate_hz, audio_waveform) where: | 
					
						
						|  | - sample_rate_hz: int sample rate in Hz (24_000) | 
					
						
						|  | - audio_waveform: numpy.ndarray float32 mono waveform in range [-1, 1] | 
					
						
						|  | """ | 
					
						
						|  | _log_call_start("Generate_Speech", text=_truncate_for_log(text, 200), speed=speed, voice=voice) | 
					
						
						|  | if not text or not text.strip(): | 
					
						
						|  | try: | 
					
						
						|  | _log_call_end("Generate_Speech", "error=empty text") | 
					
						
						|  | finally: | 
					
						
						|  | pass | 
					
						
						|  | raise gr.Error("Please provide non-empty text to synthesize.") | 
					
						
						|  |  | 
					
						
						|  | _init_kokoro() | 
					
						
						|  | model = _KOKORO_STATE["model"] | 
					
						
						|  | pipelines = _KOKORO_STATE["pipelines"] | 
					
						
						|  |  | 
					
						
						|  | pipeline = pipelines.get("a") | 
					
						
						|  | if pipeline is None: | 
					
						
						|  | raise gr.Error("Kokoro English pipeline not initialized.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | audio_segments = [] | 
					
						
						|  | pack = pipeline.load_voice(voice) | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  |  | 
					
						
						|  | segments = list(pipeline(text, voice, speed)) | 
					
						
						|  | total_segments = len(segments) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | for segment_idx, (text_chunk, ps, _) in enumerate(segments): | 
					
						
						|  | ref_s = pack[len(ps) - 1] | 
					
						
						|  | try: | 
					
						
						|  | audio = model(ps, ref_s, float(speed)) | 
					
						
						|  | audio_segments.append(audio.detach().cpu().numpy()) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if total_segments > 10 and (segment_idx + 1) % 5 == 0: | 
					
						
						|  | print(f"Progress: Generated {segment_idx + 1}/{total_segments} segments...") | 
					
						
						|  |  | 
					
						
						|  | except Exception as e: | 
					
						
						|  | raise gr.Error(f"Error generating audio for segment {segment_idx + 1}: {str(e)}") | 
					
						
						|  |  | 
					
						
						|  | if not audio_segments: | 
					
						
						|  | raise gr.Error("No audio was generated (empty synthesis result).") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if len(audio_segments) == 1: | 
					
						
						|  | final_audio = audio_segments[0] | 
					
						
						|  | else: | 
					
						
						|  | final_audio = np.concatenate(audio_segments, axis=0) | 
					
						
						|  |  | 
					
						
						|  | duration = len(final_audio) / 24_000 | 
					
						
						|  | if total_segments > 1: | 
					
						
						|  | print(f"Completed: {total_segments} segments concatenated into {duration:.1f} seconds of audio") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | _log_call_end("Generate_Speech", f"samples={final_audio.shape[0]} duration_sec={len(final_audio)/24_000:.2f}") | 
					
						
						|  | return 24_000, final_audio | 
					
						
						|  |  | 
					
						
						|  | except gr.Error as e: | 
					
						
						|  | _log_call_end("Generate_Speech", f"gr_error={str(e)}") | 
					
						
						|  | raise | 
					
						
						|  | except Exception as e: | 
					
						
						|  | _log_call_end("Generate_Speech", f"error={str(e)[:120]}") | 
					
						
						|  | raise gr.Error(f"Error during speech generation: {str(e)}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | MEMORY_FILE = os.path.join(os.path.dirname(__file__), "memories.json") | 
					
						
						|  | _MEMORY_LOCK = threading.RLock() | 
					
						
						|  | _MAX_MEMORIES = 10_000 | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _now_iso() -> str: | 
					
						
						|  | return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _load_memories() -> List[Dict[str, str]]: | 
					
						
						|  | """Internal helper: load memory list from disk. | 
					
						
						|  |  | 
					
						
						|  | Returns an empty list if the file does not exist or is unreadable. | 
					
						
						|  | If the JSON is corrupted, a *.corrupt backup is written once and a | 
					
						
						|  | fresh empty list is returned (fail‑open philosophy for tool usage). | 
					
						
						|  | """ | 
					
						
						|  | if not os.path.exists(MEMORY_FILE): | 
					
						
						|  | return [] | 
					
						
						|  | try: | 
					
						
						|  | with open(MEMORY_FILE, "r", encoding="utf-8") as f: | 
					
						
						|  | data = json.load(f) | 
					
						
						|  | if isinstance(data, list): | 
					
						
						|  |  | 
					
						
						|  | cleaned: List[Dict[str, str]] = [] | 
					
						
						|  | for item in data: | 
					
						
						|  | if isinstance(item, dict) and "id" in item and "text" in item: | 
					
						
						|  | cleaned.append(item) | 
					
						
						|  | return cleaned | 
					
						
						|  | return [] | 
					
						
						|  | except Exception: | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | backup = MEMORY_FILE + ".corrupt" | 
					
						
						|  | if not os.path.exists(backup): | 
					
						
						|  | os.replace(MEMORY_FILE, backup) | 
					
						
						|  | except Exception: | 
					
						
						|  | pass | 
					
						
						|  | return [] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _save_memories(memories: List[Dict[str, str]]) -> None: | 
					
						
						|  | """Persist memory list atomically to disk (write temp then replace).""" | 
					
						
						|  | tmp_path = MEMORY_FILE + ".tmp" | 
					
						
						|  | with open(tmp_path, "w", encoding="utf-8") as f: | 
					
						
						|  | json.dump(memories, f, ensure_ascii=False, indent=2) | 
					
						
						|  | os.replace(tmp_path, MEMORY_FILE) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _mem_save( | 
					
						
						|  | text: Annotated[str, "Raw textual content to remember (will be stored verbatim)."], | 
					
						
						|  | tags: Annotated[str, "Optional comma-separated tags for lightweight categorization (e.g. 'user, preference')."] = "", | 
					
						
						|  | ) -> str: | 
					
						
						|  | """(Internal) Persist a new memory record. | 
					
						
						|  |  | 
					
						
						|  | Summary: | 
					
						
						|  | Adds a memory object to the local JSON store (no external database). | 
					
						
						|  |  | 
					
						
						|  | Stored Fields: | 
					
						
						|  | - id (str, UUID4) | 
					
						
						|  | - text (str, verbatim user content) | 
					
						
						|  | - timestamp (UTC "YYYY-MM-DD HH:MM:SS") | 
					
						
						|  | - tags (str, original comma-separated tag string) | 
					
						
						|  |  | 
					
						
						|  | Behavior / Rules: | 
					
						
						|  | 1. Whitespace is trimmed; empty text is rejected. | 
					
						
						|  | 2. If the most recent existing memory has identical text, the new one is skipped (light dedupe heuristic). | 
					
						
						|  | 3. When total entries exceed _MAX_MEMORIES, oldest entries are pruned (soft cap). | 
					
						
						|  | 4. Operation is protected by an in‑process reentrant lock only (no cross‑process locking). | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Human readable confirmation containing the new memory UUID (full or prefix | 
					
						
						|  |  | 
					
						
						|  | Security / Privacy: | 
					
						
						|  | Data is plaintext JSON on local disk; do NOT store secrets or regulated data. | 
					
						
						|  | """ | 
					
						
						|  | text_clean = (text or "").strip() | 
					
						
						|  | if not text_clean: | 
					
						
						|  | return "Error: memory text is empty." | 
					
						
						|  |  | 
					
						
						|  | with _MEMORY_LOCK: | 
					
						
						|  | memories = _load_memories() | 
					
						
						|  | if memories and memories[-1].get("text") == text_clean: | 
					
						
						|  | return "Skipped: identical to last stored memory." | 
					
						
						|  |  | 
					
						
						|  | mem_id = str(uuid.uuid4()) | 
					
						
						|  | entry = { | 
					
						
						|  | "id": mem_id, | 
					
						
						|  | "text": text_clean, | 
					
						
						|  | "timestamp": _now_iso(), | 
					
						
						|  | "tags": tags.strip(), | 
					
						
						|  | } | 
					
						
						|  | memories.append(entry) | 
					
						
						|  | if len(memories) > _MAX_MEMORIES: | 
					
						
						|  |  | 
					
						
						|  | overflow = len(memories) - _MAX_MEMORIES | 
					
						
						|  | memories = memories[overflow:] | 
					
						
						|  | _save_memories(memories) | 
					
						
						|  | return f"Memory saved: {mem_id}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _mem_list( | 
					
						
						|  | limit: Annotated[int, "Maximum number of most recent memories to return (1–200)."] = 20, | 
					
						
						|  | include_tags: Annotated[bool, "If true, include tags column in output."] = True, | 
					
						
						|  | ) -> str: | 
					
						
						|  | """(Internal) List most recent memories. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | limit (int): Max rows to return; clamped to [1, 200]. | 
					
						
						|  | include_tags (bool): Include tags section when True. | 
					
						
						|  |  | 
					
						
						|  | Output Format (one per line): | 
					
						
						|  | <uuid_prefix> [YYYY-MM-DD HH:MM:SS] <text> | tags: <tag list> | 
					
						
						|  | (Tag column omitted if empty or include_tags=False.) | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Joined newline string or a friendly "No memories stored." message. | 
					
						
						|  | """ | 
					
						
						|  | limit = max(1, min(200, limit)) | 
					
						
						|  | with _MEMORY_LOCK: | 
					
						
						|  | memories = _load_memories() | 
					
						
						|  | if not memories: | 
					
						
						|  | return "No memories stored yet." | 
					
						
						|  |  | 
					
						
						|  | chosen = memories[-limit:][::-1] | 
					
						
						|  | lines: List[str] = [] | 
					
						
						|  | for m in chosen: | 
					
						
						|  | base = f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" | 
					
						
						|  | if include_tags and m.get("tags"): | 
					
						
						|  | base += f" | tags: {m['tags']}" | 
					
						
						|  | lines.append(base) | 
					
						
						|  | omitted = len(memories) - len(chosen) | 
					
						
						|  | if omitted > 0: | 
					
						
						|  | lines.append(f"… ({omitted} older memorie{'s' if omitted!=1 else ''} omitted; total={len(memories)})") | 
					
						
						|  | return "\n".join(lines) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _mem_search( | 
					
						
						|  | query: Annotated[str, "Case-insensitive substring search; space-separated terms are ANDed."], | 
					
						
						|  | limit: Annotated[int, "Maximum number of matches (1–200)."] = 20, | 
					
						
						|  | ) -> str: | 
					
						
						|  | """(Internal) Full-text style AND search across text and tags. | 
					
						
						|  |  | 
					
						
						|  | Search Semantics: | 
					
						
						|  | - Split query on whitespace into individual terms. | 
					
						
						|  | - A memory matches only if EVERY term appears (case-insensitive) in the text OR tags field. | 
					
						
						|  | - Results are ordered newest-first (descending timestamp). | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | query (str): Raw user query string; must contain at least one non-space character. | 
					
						
						|  | limit (int): Max rows to return; clamped to [1, 200]. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Formatted lines identical to _mem_list output or "No matches". | 
					
						
						|  | """ | 
					
						
						|  | q = (query or "").strip() | 
					
						
						|  | if not q: | 
					
						
						|  | return "Error: empty query." | 
					
						
						|  | terms = [t.lower() for t in q.split() if t.strip()] | 
					
						
						|  | if not terms: | 
					
						
						|  | return "Error: no valid search terms." | 
					
						
						|  | limit = max(1, min(200, limit)) | 
					
						
						|  | with _MEMORY_LOCK: | 
					
						
						|  | memories = _load_memories() | 
					
						
						|  |  | 
					
						
						|  | matches: List[Dict[str, str]] = [] | 
					
						
						|  | total_matches = 0 | 
					
						
						|  | for m in reversed(memories): | 
					
						
						|  | hay = (m.get("text", "") + " " + m.get("tags", "")).lower() | 
					
						
						|  | if all(t in hay for t in terms): | 
					
						
						|  | total_matches += 1 | 
					
						
						|  | if len(matches) < limit: | 
					
						
						|  | matches.append(m) | 
					
						
						|  | if not matches: | 
					
						
						|  | return f"No matches for: {query}" | 
					
						
						|  | lines = [ | 
					
						
						|  | f"{m['id'][:8]} [{m.get('timestamp','?')}] {m.get('text','')}" + (f" | tags: {m['tags']}" if m.get('tags') else "") | 
					
						
						|  | for m in matches | 
					
						
						|  | ] | 
					
						
						|  | omitted = total_matches - len(matches) | 
					
						
						|  | if omitted > 0: | 
					
						
						|  | lines.append(f"… ({omitted} additional match{'es' if omitted!=1 else ''} omitted; total_matches={total_matches})") | 
					
						
						|  | return "\n".join(lines) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _mem_delete( | 
					
						
						|  | memory_id: Annotated[str, "Full UUID or a unique prefix (>=4 chars) of the memory id to delete."], | 
					
						
						|  | ) -> str: | 
					
						
						|  | """(Internal) Delete one memory by UUID or unique prefix. | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | memory_id (str): Full UUID4 (preferred) OR a unique prefix (>=4 chars). If prefix is ambiguous, no deletion occurs. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: One of: success message, ambiguity notice, or not-found message. | 
					
						
						|  |  | 
					
						
						|  | Safety: | 
					
						
						|  | Ambiguous prefixes are rejected to prevent accidental mass deletion. | 
					
						
						|  | """ | 
					
						
						|  | key = (memory_id or "").strip().lower() | 
					
						
						|  | if len(key) < 4: | 
					
						
						|  | return "Error: supply at least 4 characters of the id." | 
					
						
						|  | with _MEMORY_LOCK: | 
					
						
						|  | memories = _load_memories() | 
					
						
						|  | matched = [m for m in memories if m["id"].lower().startswith(key)] | 
					
						
						|  | if not matched: | 
					
						
						|  | return "Memory not found." | 
					
						
						|  | if len(matched) > 1 and key != matched[0]["id"].lower(): | 
					
						
						|  |  | 
					
						
						|  | sample = ", ".join(m["id"][:8] for m in matched[:5]) | 
					
						
						|  | more = "…" if len(matched) > 5 else "" | 
					
						
						|  | return f"Ambiguous prefix (matches {len(matched)} ids: {sample}{more}). Provide more characters." | 
					
						
						|  |  | 
					
						
						|  | target_id = matched[0]["id"] | 
					
						
						|  | memories = [m for m in memories if m["id"] != target_id] | 
					
						
						|  | _save_memories(memories) | 
					
						
						|  | return f"Deleted memory: {target_id}" | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | fetch_interface = gr.Interface( | 
					
						
						|  | fn=Fetch_Webpage, | 
					
						
						|  | inputs=[ | 
					
						
						|  | gr.Textbox(label="URL", placeholder="https://example.com/article"), | 
					
						
						|  | gr.Dropdown( | 
					
						
						|  | label="Verbosity", | 
					
						
						|  | choices=["Brief", "Standard", "Full"], | 
					
						
						|  | value="Standard", | 
					
						
						|  | info="Brief: 1000 chars, Standard: 3000 chars, Full: complete page" | 
					
						
						|  | ), | 
					
						
						|  | ], | 
					
						
						|  | outputs=gr.Markdown(label="Extracted Markdown"), | 
					
						
						|  | title="Fetch Webpage", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Convert any webpage to clean Markdown format with configurable length, preserving structure and formatting while removing navigation and clutter.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Fetch a web page and return it converted to Markdown format with configurable length. " | 
					
						
						|  | "Parameters: url (str - absolute URL), verbosity (str - Brief/Standard/Full controlling output length: Brief=1000 chars, Standard=3000 chars, Full=complete page)." | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | concise_interface = gr.Interface( | 
					
						
						|  | fn=Search_DuckDuckGo, | 
					
						
						|  | inputs=[ | 
					
						
						|  | gr.Textbox(label="Query", placeholder="topic OR site:example.com"), | 
					
						
						|  | gr.Slider(minimum=1, maximum=20, value=5, step=1, label="Max results"), | 
					
						
						|  | ], | 
					
						
						|  | outputs=gr.Textbox(label="Search Results", interactive=False), | 
					
						
						|  | title="DuckDuckGo Search", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Web search with readable output format. Supports advanced search operators.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Run a DuckDuckGo search and return numbered results with URLs, titles, and summaries. " | 
					
						
						|  | "Supports advanced search operators: site: for specific domains, quotes for exact phrases, " | 
					
						
						|  | "OR for alternatives, and - to exclude terms. Examples: 'Python programming', 'site:example.com', " | 
					
						
						|  | "'\"artificial intelligence\"', 'cats -dogs', 'Python OR JavaScript'." | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | submit_btn="Search", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | code_interface = gr.Interface( | 
					
						
						|  | fn=Execute_Python, | 
					
						
						|  | inputs=gr.Code(label="Python Code", language="python"), | 
					
						
						|  | outputs=gr.Textbox(label="Output"), | 
					
						
						|  | title="Python Code Executor", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Execute Python code and see the output.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Execute arbitrary Python code and return captured stdout or an error message. " | 
					
						
						|  | "Supports any valid Python code including imports, variables, functions, loops, and calculations. " | 
					
						
						|  | "Examples: 'print(2+2)', 'import math; print(math.sqrt(16))', 'for i in range(3): print(i)'. " | 
					
						
						|  | "Parameters: code (str - Python source code to execute). " | 
					
						
						|  | "Returns: Combined stdout output or exception text if execution fails." | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | CSS_STYLES = """ | 
					
						
						|  | .gradio-container h1 { | 
					
						
						|  | text-align: center; | 
					
						
						|  | /* Ensure main title appears first, then our two subtitle lines */ | 
					
						
						|  | display: grid; | 
					
						
						|  | justify-items: center; | 
					
						
						|  | } | 
					
						
						|  | /* Place bold tools list on line 2, normal auth note on line 3 (below title) */ | 
					
						
						|  | .gradio-container h1::before { | 
					
						
						|  | grid-row: 2; | 
					
						
						|  | content: "Fetch Webpage | Search DuckDuckGo | Python Interpreter | Memory Manager | Kokoro TTS | Image Generation | Video Generation"; | 
					
						
						|  | display: block; | 
					
						
						|  | font-size: 1rem; | 
					
						
						|  | font-weight: 700; | 
					
						
						|  | opacity: 0.9; | 
					
						
						|  | margin-top: 6px; | 
					
						
						|  | white-space: pre-wrap; | 
					
						
						|  | } | 
					
						
						|  | .gradio-container h1::after { | 
					
						
						|  | grid-row: 3; | 
					
						
						|  | content: "Authentication is optional but Image/Video Generation require a `HF_READ_TOKEN` in env secrets. They are hidden otherwise. Same with Memory (intended for local use)."; | 
					
						
						|  | display: block; | 
					
						
						|  | font-size: 1rem; | 
					
						
						|  | font-weight: 400; | 
					
						
						|  | opacity: 0.9; | 
					
						
						|  | margin-top: 2px; | 
					
						
						|  | white-space: pre-wrap; | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | /* Remove inside tab panels so it doesn't duplicate under each tool title */ | 
					
						
						|  | .gradio-container [role=\"tabpanel\"] h1::before, | 
					
						
						|  | .gradio-container [role=\"tabpanel\"] h1::after { | 
					
						
						|  | content: none !important; | 
					
						
						|  | } | 
					
						
						|  | """ | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | available_voices = get_kokoro_voices() | 
					
						
						|  | kokoro_interface = gr.Interface( | 
					
						
						|  | fn=Generate_Speech, | 
					
						
						|  | inputs=[ | 
					
						
						|  | gr.Textbox(label="Text", placeholder="Type text to synthesize…", lines=4), | 
					
						
						|  | gr.Slider(minimum=0.5, maximum=2.0, value=1.25, step=0.1, label="Speed"), | 
					
						
						|  | gr.Dropdown( | 
					
						
						|  | label="Voice", | 
					
						
						|  | choices=available_voices, | 
					
						
						|  | value="af_heart", | 
					
						
						|  | info="Select from 54 available voices across multiple languages and accents" | 
					
						
						|  | ), | 
					
						
						|  | ], | 
					
						
						|  | outputs=gr.Audio(label="Audio", type="numpy", format="wav", show_download_button=True), | 
					
						
						|  | title="Kokoro TTS", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Generate speech with Kokoro-82M. Supports multiple languages and accents. Runs on CPU or CUDA if available.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Synthesize speech from text using Kokoro-82M TTS model. Returns (sample_rate, waveform) suitable for playback. " | 
					
						
						|  | "Supports unlimited text length by processing all segments. Voice examples: 'af_heart' (US female), 'am_onyx' (US male), " | 
					
						
						|  | "'bf_emma' (British female), 'af_sky' (US female), 'af_nicole' (US female), " | 
					
						
						|  | "Parameters: text (str), speed (float 0.5–2.0, default 1.25x), voice (str from 54 available options, default 'af_heart'). " | 
					
						
						|  | "Return the generated media to the user in this format ``" | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  | def Memory_Manager( | 
					
						
						|  | action: Annotated[Literal["save","list","search","delete"], "Action to perform: save | list | search | delete"], | 
					
						
						|  | text: Annotated[Optional[str], "Text content (Save only)"] = None, | 
					
						
						|  | tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, | 
					
						
						|  | query: Annotated[Optional[str], "Search query terms (Search only)"] = None, | 
					
						
						|  | limit: Annotated[int, "Max results (List/Search only)"] = 20, | 
					
						
						|  | memory_id: Annotated[Optional[str], "Full UUID or unique prefix (Delete only)"] = None, | 
					
						
						|  | include_tags: Annotated[bool, "Include tags (List/Search only)"] = True, | 
					
						
						|  | ) -> str: | 
					
						
						|  | """Manage lightweight local JSON “memories” (save | list | search | delete) in one MCP tool. | 
					
						
						|  |  | 
					
						
						|  | Overview: | 
					
						
						|  | This tool provides simple, local, append‑only style persistence for short text memories | 
					
						
						|  | with optional tags. Data is stored in a plaintext JSON file ("memories.json") beside the | 
					
						
						|  | application; no external database or network access is required. | 
					
						
						|  |  | 
					
						
						|  | Supported Actions: | 
					
						
						|  | - save   : Store a new memory (requires 'text'; optional 'tags'). | 
					
						
						|  | - list   : Return the most recent memories (respects 'limit' + 'include_tags'). | 
					
						
						|  | - search : AND match space‑separated terms across text and tags (uses 'query', 'limit'). | 
					
						
						|  | - delete : Remove one memory by full UUID or unique prefix (uses 'memory_id'). | 
					
						
						|  |  | 
					
						
						|  | Parameter Usage by Action: | 
					
						
						|  | action=save   -> text (required), tags (optional) | 
					
						
						|  | action=list   -> limit, include_tags | 
					
						
						|  | action=search -> query (required), limit, include_tags | 
					
						
						|  | action=delete -> memory_id (required) | 
					
						
						|  |  | 
					
						
						|  | Parameters: | 
					
						
						|  | action (Literal[save|list|search|delete]): Operation selector (case-insensitive). | 
					
						
						|  | text (str): Raw memory content; leading/trailing whitespace trimmed (save only). | 
					
						
						|  | tags (str): Optional comma-separated tags; stored verbatim (save only). | 
					
						
						|  | query (str): Space-separated terms (AND logic, case-insensitive) across text+tags (search only). | 
					
						
						|  | limit (int): Maximum rows for list/search (clamped internally to 1–200). | 
					
						
						|  | memory_id (str): Full UUID or unique prefix (>=4 chars) (delete only). | 
					
						
						|  | include_tags (bool): When True, show tag column in list/search output. | 
					
						
						|  |  | 
					
						
						|  | Storage Format (per entry): | 
					
						
						|  | {"id": "<uuid4>", "text": "<original text>", "timestamp": "YYYY-MM-DD HH:MM:SS", "tags": "tag1, tag2"} | 
					
						
						|  |  | 
					
						
						|  | Lifecycle & Constraints: | 
					
						
						|  | - A soft cap of {_MAX_MEMORIES} entries is enforced by pruning oldest records on save. | 
					
						
						|  | - A light duplicate guard skips saving if the newest existing entry has identical text. | 
					
						
						|  | - All operations are protected by a thread‑local reentrant lock (NOT multi‑process safe). | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Human‑readable status / result lines (never raw JSON) suitable for direct model consumption. | 
					
						
						|  |  | 
					
						
						|  | Error Modes: | 
					
						
						|  | - Invalid action -> error string. | 
					
						
						|  | - Missing required field for the chosen action -> explanatory message. | 
					
						
						|  | - Ambiguous or unknown memory_id on delete -> clarification message. | 
					
						
						|  |  | 
					
						
						|  | Security & Privacy: | 
					
						
						|  | Plaintext JSON; do not store secrets, credentials, or regulated personal data. | 
					
						
						|  | """ | 
					
						
						|  | act = (action or "").lower().strip() | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | text = text or "" | 
					
						
						|  | tags = tags or "" | 
					
						
						|  | query = query or "" | 
					
						
						|  | memory_id = memory_id or "" | 
					
						
						|  |  | 
					
						
						|  | if act == "save": | 
					
						
						|  | if not text.strip(): | 
					
						
						|  | return "Error: 'text' is required when action=save." | 
					
						
						|  | return _mem_save(text=text, tags=tags) | 
					
						
						|  | if act == "list": | 
					
						
						|  | return _mem_list(limit=limit, include_tags=include_tags) | 
					
						
						|  | if act == "search": | 
					
						
						|  | if not query.strip(): | 
					
						
						|  | return "Error: 'query' is required when action=search." | 
					
						
						|  | return _mem_search(query=query, limit=limit) | 
					
						
						|  | if act == "delete": | 
					
						
						|  | if not memory_id.strip(): | 
					
						
						|  | return "Error: 'memory_id' is required when action=delete." | 
					
						
						|  | return _mem_delete(memory_id=memory_id) | 
					
						
						|  | return "Error: invalid action (use save|list|search|delete)." | 
					
						
						|  |  | 
					
						
						|  | memory_interface = gr.Interface( | 
					
						
						|  | fn=Memory_Manager, | 
					
						
						|  | inputs=[ | 
					
						
						|  | gr.Dropdown(label="Action", choices=["save","list","search","delete"], value="list"), | 
					
						
						|  | gr.Textbox(label="Text", lines=3, placeholder="Memory text (save)"), | 
					
						
						|  | gr.Textbox(label="Tags", placeholder="tag1, tag2"), | 
					
						
						|  | gr.Textbox(label="Query", placeholder="Search terms (search)"), | 
					
						
						|  | gr.Slider(1, 200, value=20, step=1, label="Limit"), | 
					
						
						|  | gr.Textbox(label="Memory ID / Prefix", placeholder="UUID or prefix (delete)"), | 
					
						
						|  | gr.Checkbox(value=True, label="Include Tags"), | 
					
						
						|  | ], | 
					
						
						|  | outputs=gr.Textbox(label="Result", lines=14), | 
					
						
						|  | title="Memory Manager", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Lightweight local JSON memory store (no external DB). Choose an Action, fill only the relevant fields, and run.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Manage short text memories with optional tags. Actions: save(text,tags), list(limit,include_tags), " | 
					
						
						|  | "search(query,limit,include_tags), delete(memory_id). Returns plaintext JSON. Action parameter is always required. " | 
					
						
						|  | "Use Memory_Manager whenever you are given information worth remembering about the user, and search for memories when relevant." | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | HF_API_TOKEN = os.getenv("HF_READ_TOKEN") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def Generate_Image( | 
					
						
						|  | prompt: Annotated[str, "Text description of the image to generate."], | 
					
						
						|  | model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name' (e.g., black-forest-labs/FLUX.1-Krea-dev)."] = "black-forest-labs/FLUX.1-Krea-dev", | 
					
						
						|  | negative_prompt: Annotated[str, "What should NOT appear in the image." ] = ( | 
					
						
						|  | "(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " | 
					
						
						|  | "missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " | 
					
						
						|  | "mutated, ugly, disgusting, blurry, amputation, misspellings, typos" | 
					
						
						|  | ), | 
					
						
						|  | steps: Annotated[int, "Number of denoising steps (1–100). Higher = slower, potentially higher quality."] = 35, | 
					
						
						|  | cfg_scale: Annotated[float, "Classifier-free guidance scale (1–20). Higher = follow the prompt more closely."] = 7.0, | 
					
						
						|  | sampler: Annotated[str, "Sampling method label (UI only). Common options: 'DPM++ 2M Karras', 'DPM++ SDE Karras', 'Euler', 'Euler a', 'Heun', 'DDIM'."] = "DPM++ 2M Karras", | 
					
						
						|  | seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, | 
					
						
						|  | width: Annotated[int, "Output width in pixels (64–1216, multiple of 32 recommended)."] = 1024, | 
					
						
						|  | height: Annotated[int, "Output height in pixels (64–1216, multiple of 32 recommended)."] = 1024, | 
					
						
						|  | ) -> Image.Image: | 
					
						
						|  | """ | 
					
						
						|  | Generate a single image from a text prompt using a Hugging Face model via serverless inference. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | prompt (str): Text description of the image to generate. | 
					
						
						|  | model_id (str): The Hugging Face model id (creator/model-name). Defaults to "black-forest-labs/FLUX.1-Krea-dev". | 
					
						
						|  | negative_prompt (str): What should NOT appear in the image. | 
					
						
						|  | steps (int): Number of denoising steps (1–100). Higher can improve quality. | 
					
						
						|  | cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. | 
					
						
						|  | sampler (str): Sampling method label for UI; not all providers expose this control. | 
					
						
						|  | seed (int): Random seed. Use -1 to randomize on each call. | 
					
						
						|  | width (int): Output width in pixels (64–1216; multiples of 32 recommended). | 
					
						
						|  | height (int): Output height in pixels (64–1216; multiples of 32 recommended). | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | PIL.Image.Image: The generated image. | 
					
						
						|  |  | 
					
						
						|  | Error modes: | 
					
						
						|  | - Raises gr.Error with a user-friendly message on auth/model/load errors. | 
					
						
						|  | """ | 
					
						
						|  | _log_call_start("Generate_Image", prompt=_truncate_for_log(prompt, 200), model_id=model_id, steps=steps, cfg_scale=cfg_scale, seed=seed, size=f"{width}x{height}") | 
					
						
						|  | if not prompt or not prompt.strip(): | 
					
						
						|  | _log_call_end("Generate_Image", "error=empty prompt") | 
					
						
						|  | raise gr.Error("Please provide a non-empty prompt.") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | enhanced_prompt = f"{prompt} | ultra detail, ultra elaboration, ultra quality, perfect." | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | providers = ["auto", "replicate", "fal-ai"] | 
					
						
						|  | last_error: Exception | None = None | 
					
						
						|  |  | 
					
						
						|  | for provider in providers: | 
					
						
						|  | try: | 
					
						
						|  | client = InferenceClient(api_key=HF_API_TOKEN, provider=provider) | 
					
						
						|  | image = client.text_to_image( | 
					
						
						|  | prompt=enhanced_prompt, | 
					
						
						|  | negative_prompt=negative_prompt, | 
					
						
						|  | model=model_id, | 
					
						
						|  | width=width, | 
					
						
						|  | height=height, | 
					
						
						|  | num_inference_steps=steps, | 
					
						
						|  | guidance_scale=cfg_scale, | 
					
						
						|  | seed=seed if seed != -1 else random.randint(1, 1_000_000_000), | 
					
						
						|  | ) | 
					
						
						|  | _log_call_end("Generate_Image", f"provider={provider} size={image.size}") | 
					
						
						|  | return image | 
					
						
						|  | except Exception as e: | 
					
						
						|  | last_error = e | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | msg = str(last_error) if last_error else "Unknown error" | 
					
						
						|  | if "404" in msg: | 
					
						
						|  | raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and your HF token access.") | 
					
						
						|  | if "503" in msg: | 
					
						
						|  | raise gr.Error("The model is warming up. Please try again shortly.") | 
					
						
						|  | if "401" in msg or "403" in msg: | 
					
						
						|  | raise gr.Error("Authentication failed. Set HF_READ_TOKEN environment variable with access to the model.") | 
					
						
						|  | _log_call_end("Generate_Image", f"error={_truncate_for_log(msg, 200)}") | 
					
						
						|  | raise gr.Error(f"Image generation failed: {msg}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | image_generation_interface = gr.Interface( | 
					
						
						|  | fn=Generate_Image, | 
					
						
						|  | inputs=[ | 
					
						
						|  | gr.Textbox(label="Prompt", placeholder="Enter a prompt", lines=2), | 
					
						
						|  | gr.Textbox(label="Model", value="black-forest-labs/FLUX.1-Krea-dev", placeholder="creator/model-name"), | 
					
						
						|  | gr.Textbox( | 
					
						
						|  | label="Negative Prompt", | 
					
						
						|  | value=( | 
					
						
						|  | "(deformed, distorted, disfigured), poorly drawn, bad anatomy, wrong anatomy, extra limb, " | 
					
						
						|  | "missing limb, floating limbs, (mutated hands and fingers), disconnected limbs, mutation, " | 
					
						
						|  | "mutated, ugly, disgusting, blurry, amputation, misspellings, typos" | 
					
						
						|  | ), | 
					
						
						|  | lines=2, | 
					
						
						|  | ), | 
					
						
						|  | gr.Slider(minimum=1, maximum=100, value=35, step=1, label="Steps"), | 
					
						
						|  | gr.Slider(minimum=1.0, maximum=20.0, value=7.0, step=0.1, label="CFG Scale"), | 
					
						
						|  | gr.Radio(label="Sampler", value="DPM++ 2M Karras", choices=[ | 
					
						
						|  | "DPM++ 2M Karras", "DPM++ SDE Karras", "Euler", "Euler a", "Heun", "DDIM" | 
					
						
						|  | ]), | 
					
						
						|  | gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), | 
					
						
						|  | gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Width"), | 
					
						
						|  | gr.Slider(minimum=64, maximum=1216, value=1024, step=32, label="Height"), | 
					
						
						|  | ], | 
					
						
						|  | outputs=gr.Image(label="Generated Image"), | 
					
						
						|  | title="Image Generation", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Generate images via Hugging Face serverless inference. " | 
					
						
						|  | "Default model is FLUX.1-Krea-dev.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Generate a single image from a text prompt using a Hugging Face model via serverless inference. " | 
					
						
						|  | "Supports creative prompts like 'a serene mountain landscape at sunset', 'portrait of a wise owl', " | 
					
						
						|  | "'futuristic city with flying cars'. Default model: FLUX.1-Krea-dev. " | 
					
						
						|  | "Parameters: prompt (str), model_id (str, creator/model-name), negative_prompt (str), steps (int, 1–100), " | 
					
						
						|  | "cfg_scale (float, 1–20), sampler (str), seed (int, -1=random), width/height (int, 64–1216). " | 
					
						
						|  | "Returns a PIL.Image. Return the generated media to the user in this format ``" | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def _write_video_tmp(data_iter_or_bytes: object, suffix: str = ".mp4") -> str: | 
					
						
						|  | """Write video bytes or iterable of bytes to a system temporary file and return its path. | 
					
						
						|  |  | 
					
						
						|  | This avoids polluting the project directory. The file is created in the OS temp | 
					
						
						|  | location; Gradio will handle serving & offering the download button. | 
					
						
						|  | """ | 
					
						
						|  | fd, fname = tempfile.mkstemp(suffix=suffix) | 
					
						
						|  | try: | 
					
						
						|  | with os.fdopen(fd, "wb") as f: | 
					
						
						|  | if isinstance(data_iter_or_bytes, (bytes, bytearray)): | 
					
						
						|  | f.write(data_iter_or_bytes) | 
					
						
						|  | elif hasattr(data_iter_or_bytes, "read"): | 
					
						
						|  | f.write(data_iter_or_bytes.read()) | 
					
						
						|  | elif hasattr(data_iter_or_bytes, "content"): | 
					
						
						|  | f.write(data_iter_or_bytes.content) | 
					
						
						|  | elif hasattr(data_iter_or_bytes, "__iter__") and not isinstance(data_iter_or_bytes, (str, dict)): | 
					
						
						|  | for chunk in data_iter_or_bytes: | 
					
						
						|  | if chunk: | 
					
						
						|  | f.write(chunk) | 
					
						
						|  | else: | 
					
						
						|  | raise gr.Error("Unsupported video data type returned by provider.") | 
					
						
						|  | except Exception: | 
					
						
						|  |  | 
					
						
						|  | try: | 
					
						
						|  | os.remove(fname) | 
					
						
						|  | except Exception: | 
					
						
						|  | pass | 
					
						
						|  | raise | 
					
						
						|  | return fname | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | HF_VIDEO_TOKEN = os.getenv("HF_READ_TOKEN") or os.getenv("HF_TOKEN") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | def Generate_Video( | 
					
						
						|  | prompt: Annotated[str, "Text description of the video to generate (e.g., 'a red fox running through a snowy forest at sunrise')."], | 
					
						
						|  | model_id: Annotated[str, "Hugging Face model id in the form 'creator/model-name'. Defaults to Wan-AI/Wan2.2-T2V-A14B."] = "Wan-AI/Wan2.2-T2V-A14B", | 
					
						
						|  | negative_prompt: Annotated[str, "What should NOT appear in the video."] = "", | 
					
						
						|  | steps: Annotated[int, "Number of denoising steps (1–100). Higher can improve quality but is slower."] = 25, | 
					
						
						|  | cfg_scale: Annotated[float, "Guidance scale (1–20). Higher = follow the prompt more closely, lower = more creative."] = 3.5, | 
					
						
						|  | seed: Annotated[int, "Random seed for reproducibility. Use -1 for a random seed per call."] = -1, | 
					
						
						|  | width: Annotated[int, "Output width in pixels (multiples of 8 recommended)."] = 768, | 
					
						
						|  | height: Annotated[int, "Output height in pixels (multiples of 8 recommended)."] = 768, | 
					
						
						|  | fps: Annotated[int, "Frames per second of the output video (e.g., 24)."] = 24, | 
					
						
						|  | duration: Annotated[float, "Target duration in seconds (provider/model dependent, commonly 2–6s)."] = 4.0, | 
					
						
						|  | ) -> str: | 
					
						
						|  | """ | 
					
						
						|  | Generate a short video from a text prompt using a Hugging Face model via serverless inference. | 
					
						
						|  |  | 
					
						
						|  | Args: | 
					
						
						|  | prompt (str): Text description of the video to generate. | 
					
						
						|  | model_id (str): The Hugging Face model id (creator/model-name). Defaults to "Wan-AI/Wan2.2-T2V-A14B". | 
					
						
						|  | negative_prompt (str): What should NOT appear in the video. | 
					
						
						|  | steps (int): Number of denoising steps (1–100). Higher can improve quality but is slower. | 
					
						
						|  | cfg_scale (float): Guidance scale (1–20). Higher = follow the prompt more closely. | 
					
						
						|  | seed (int): Random seed. Use -1 to randomize on each call. | 
					
						
						|  | width (int): Output width in pixels. | 
					
						
						|  | height (int): Output height in pixels. | 
					
						
						|  | fps (int): Frames per second. | 
					
						
						|  | duration (float): Target duration in seconds. | 
					
						
						|  |  | 
					
						
						|  | Returns: | 
					
						
						|  | str: Path to an MP4 file on disk (Gradio will serve this file; MCP converts it to a file URL). | 
					
						
						|  |  | 
					
						
						|  | Error modes: | 
					
						
						|  | - Raises gr.Error with a user-friendly message on auth/model/load errors or unsupported parameters. | 
					
						
						|  | """ | 
					
						
						|  | _log_call_start("Generate_Video", prompt=_truncate_for_log(prompt, 160), model_id=model_id, steps=steps, cfg_scale=cfg_scale, fps=fps, duration=duration, size=f"{width}x{height}") | 
					
						
						|  | if not prompt or not prompt.strip(): | 
					
						
						|  | _log_call_end("Generate_Video", "error=empty prompt") | 
					
						
						|  | raise gr.Error("Please provide a non-empty prompt.") | 
					
						
						|  |  | 
					
						
						|  | if not HF_VIDEO_TOKEN: | 
					
						
						|  |  | 
					
						
						|  | pass | 
					
						
						|  |  | 
					
						
						|  | providers = ["auto", "replicate", "fal-ai"] | 
					
						
						|  | last_error: Exception | None = None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | parameters = { | 
					
						
						|  | "negative_prompt": negative_prompt or None, | 
					
						
						|  | "num_inference_steps": steps, | 
					
						
						|  | "guidance_scale": cfg_scale, | 
					
						
						|  | "seed": seed if seed != -1 else random.randint(1, 1_000_000_000), | 
					
						
						|  | "width": width, | 
					
						
						|  | "height": height, | 
					
						
						|  | "fps": fps, | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | "duration": duration, | 
					
						
						|  | } | 
					
						
						|  |  | 
					
						
						|  | for provider in providers: | 
					
						
						|  | try: | 
					
						
						|  | client = InferenceClient(api_key=HF_VIDEO_TOKEN, provider=provider) | 
					
						
						|  |  | 
					
						
						|  | if hasattr(client, "text_to_video"): | 
					
						
						|  |  | 
					
						
						|  | num_frames = int(duration * fps) if duration and fps else None | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | extra_body = {} | 
					
						
						|  | if width: | 
					
						
						|  | extra_body["width"] = width | 
					
						
						|  | if height: | 
					
						
						|  | extra_body["height"] = height | 
					
						
						|  | if fps: | 
					
						
						|  | extra_body["fps"] = fps | 
					
						
						|  | if duration: | 
					
						
						|  | extra_body["duration"] = duration | 
					
						
						|  |  | 
					
						
						|  | result = client.text_to_video( | 
					
						
						|  | prompt=prompt, | 
					
						
						|  | model=model_id, | 
					
						
						|  | guidance_scale=cfg_scale, | 
					
						
						|  | negative_prompt=[negative_prompt] if negative_prompt else None, | 
					
						
						|  | num_frames=num_frames, | 
					
						
						|  | num_inference_steps=steps, | 
					
						
						|  | seed=parameters["seed"], | 
					
						
						|  | extra_body=extra_body if extra_body else None, | 
					
						
						|  | ) | 
					
						
						|  | else: | 
					
						
						|  |  | 
					
						
						|  | result = client.post( | 
					
						
						|  | model=model_id, | 
					
						
						|  | json={ | 
					
						
						|  | "inputs": prompt, | 
					
						
						|  | "parameters": {k: v for k, v in parameters.items() if v is not None}, | 
					
						
						|  | }, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | path = _write_video_tmp(result, suffix=".mp4") | 
					
						
						|  | try: | 
					
						
						|  | size = os.path.getsize(path) | 
					
						
						|  | except Exception: | 
					
						
						|  | size = -1 | 
					
						
						|  | _log_call_end("Generate_Video", f"provider={provider} path={os.path.basename(path)} bytes={size}") | 
					
						
						|  | return path | 
					
						
						|  | except Exception as e: | 
					
						
						|  | last_error = e | 
					
						
						|  | continue | 
					
						
						|  |  | 
					
						
						|  | msg = str(last_error) if last_error else "Unknown error" | 
					
						
						|  | if "404" in msg: | 
					
						
						|  | raise gr.Error(f"Model not found or unavailable: {model_id}. Check the id and HF token access.") | 
					
						
						|  | if "503" in msg: | 
					
						
						|  | raise gr.Error("The model is warming up. Please try again shortly.") | 
					
						
						|  | if "401" in msg or "403" in msg: | 
					
						
						|  | raise gr.Error("Authentication failed or not permitted. Set HF_READ_TOKEN/HF_TOKEN with inference access.") | 
					
						
						|  | _log_call_end("Generate_Video", f"error={_truncate_for_log(msg, 200)}") | 
					
						
						|  | raise gr.Error(f"Video generation failed: {msg}") | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | video_generation_interface = gr.Interface( | 
					
						
						|  | fn=Generate_Video, | 
					
						
						|  | inputs=[ | 
					
						
						|  | gr.Textbox(label="Prompt", placeholder="Enter a prompt for the video", lines=2), | 
					
						
						|  | gr.Textbox(label="Model", value="Wan-AI/Wan2.2-T2V-A14B", placeholder="creator/model-name"), | 
					
						
						|  | gr.Textbox(label="Negative Prompt", value="", lines=2), | 
					
						
						|  | gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Steps"), | 
					
						
						|  | gr.Slider(minimum=1.0, maximum=20.0, value=3.5, step=0.1, label="CFG Scale"), | 
					
						
						|  | gr.Slider(minimum=-1, maximum=1_000_000_000, value=-1, step=1, label="Seed (-1 = random)"), | 
					
						
						|  | gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Width"), | 
					
						
						|  | gr.Slider(minimum=64, maximum=1920, value=768, step=8, label="Height"), | 
					
						
						|  | gr.Slider(minimum=4, maximum=60, value=24, step=1, label="FPS"), | 
					
						
						|  | gr.Slider(minimum=1.0, maximum=10.0, value=4.0, step=0.5, label="Duration (s)"), | 
					
						
						|  | ], | 
					
						
						|  | outputs=gr.Video(label="Generated Video", show_download_button=True, format="mp4"), | 
					
						
						|  | title="Video Generation", | 
					
						
						|  | description=( | 
					
						
						|  | "<div style=\"text-align:center\">Generate short videos via Hugging Face serverless inference. " | 
					
						
						|  | "Default model is Wan2.2-T2V-A14B.</div>" | 
					
						
						|  | ), | 
					
						
						|  | api_description=( | 
					
						
						|  | "Generate a short video from a text prompt using a Hugging Face model via serverless inference. " | 
					
						
						|  | "Create dynamic scenes like 'a red fox running through a snowy forest at sunrise', 'waves crashing on a rocky shore', " | 
					
						
						|  | "'time-lapse of clouds moving across a blue sky'. Default model: Wan2.2-T2V-A14B (2-6 second videos). " | 
					
						
						|  | "Parameters: prompt (str), model_id (str), negative_prompt (str), steps (int), cfg_scale (float), seed (int), " | 
					
						
						|  | "width/height (int), fps (int), duration (float in seconds). Returns MP4 file path. " | 
					
						
						|  | "Return the generated media to the user in this format ``" | 
					
						
						|  | ), | 
					
						
						|  | flagging_mode="never", | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | HAS_HF_TOKEN = bool(HF_API_TOKEN or HF_VIDEO_TOKEN) | 
					
						
						|  |  | 
					
						
						|  | _interfaces = [ | 
					
						
						|  | fetch_interface, | 
					
						
						|  | concise_interface, | 
					
						
						|  | code_interface, | 
					
						
						|  | kokoro_interface, | 
					
						
						|  | ] | 
					
						
						|  | _tab_names = [ | 
					
						
						|  | "Fetch Webpage", | 
					
						
						|  | "DuckDuckGo Search", | 
					
						
						|  | "Python Code Executor", | 
					
						
						|  | "Kokoro TTS", | 
					
						
						|  | ] | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | HAS_HF_READ = bool(HF_API_TOKEN) | 
					
						
						|  | if HAS_HF_READ: | 
					
						
						|  |  | 
					
						
						|  | insert_index = 3 if len(_interfaces) >= 3 else len(_interfaces) | 
					
						
						|  | _interfaces.insert(insert_index, memory_interface) | 
					
						
						|  | _tab_names.insert(insert_index, "Memory Manager") | 
					
						
						|  |  | 
					
						
						|  | if HAS_HF_TOKEN: | 
					
						
						|  | _interfaces.extend([image_generation_interface, video_generation_interface]) | 
					
						
						|  | _tab_names.extend(["Image Generation", "Video Generation"]) | 
					
						
						|  |  | 
					
						
						|  | demo = gr.TabbedInterface( | 
					
						
						|  | interface_list=_interfaces, | 
					
						
						|  | tab_names=_tab_names, | 
					
						
						|  | title="Tools MCP", | 
					
						
						|  | theme="Nymbo/Nymbo_Theme", | 
					
						
						|  | css=CSS_STYLES, | 
					
						
						|  | ) | 
					
						
						|  |  | 
					
						
						|  |  | 
					
						
						|  | if __name__ == "__main__": | 
					
						
						|  | demo.launch(mcp_server=True) |