import os import logging import gradio as gr import json from typing import List from datetime import datetime, timezone from pydantic import BaseModel, Field from trafilatura import fetch_url, extract from langchain_community.llms import HuggingFaceHub from llama_cpp_agent import MessagesFormatterType from llama_cpp_agent.chat_history import BasicChatHistory from llama_cpp_agent.chat_history.messages import Roles from llama_cpp_agent.llm_output_settings import ( LlmStructuredOutputSettings, LlmStructuredOutputType, ) from llama_cpp_agent.tools import WebSearchTool from llama_cpp_agent.prompt_templates import web_search_system_prompt, research_system_prompt from langchain_community.llms import HuggingFaceHub from llama_cpp_agent.llm_output_settings import LlmStructuredOutputSettings, LlmStructuredOutputType from pydantic import BaseModel, Field from llama_cpp_agent.llm_output_settings import LlmStructuredOutputType from llama_cpp import Llama from llama_cpp_agent.providers import LlamaCppPythonProvider from llama_cpp_agent.chat_history import BasicChatHistory from llama_cpp_agent.chat_history.messages import Roles from llama_cpp_agent.llm_output_settings import LlmStructuredOutputSettings, LlmStructuredOutputType from llama_cpp_agent.tools import WebSearchTool from llama_cpp_agent.prompt_templates import web_search_system_prompt, research_system_prompt from pydantic import BaseModel, Field from typing import List print("Available LlmStructuredOutputType options:") for option in LlmStructuredOutputType: print(option) # UI related imports and definitions css = """ .message-row { justify-content: space-evenly !important; } .message-bubble-border { border-radius: 6px !important; } .message-buttons-bot, .message-buttons-user { right: 10px !important; left: auto !important; bottom: 2px !important; } .dark.message-bubble-border { border-color: #1b0f0f !important; } .dark.user { background: #140b0b !important; } .dark.assistant.dark, .dark.pending.dark { background: #0c0505 !important; } """ PLACEHOLDER = """
Logo

llama-cpp-agent

DDG Agent allows users to interact with it using natural language, making it easier for them to find the information they need. Offers a convenient and secure way for users to access web-based information.

Mistral 7B Instruct v0.3 Mixtral 8x7B Instruct v0.1 Meta Llama 3 8B Instruct
Discord GitHub
""" # Global variables huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") # Example queries examples = [ ["latest news about Yann LeCun"], ["Latest news site:github.blog"], ["Where I can find best hotel in Galapagos, Ecuador intitle:hotel"], ["filetype:pdf intitle:python"] ] class CustomLLMSettings(BaseModel): structured_output: LlmStructuredOutputSettings temperature: float = Field(default=0.7) top_p: float = Field(default=0.95) repetition_penalty: float = Field(default=1.1) top_k: int = Field(default=50) max_tokens: int = Field(default=1000) stop: list[str] = Field(default_factory=list) echo: bool = Field(default=False) stream: bool = Field(default=False) logprobs: int = Field(default=None) presence_penalty: float = Field(default=0.0) frequency_penalty: float = Field(default=0.0) best_of: int = Field(default=1) logit_bias: dict = Field(default_factory=dict) max_tokens_per_summary: int = Field(default=2048) class HuggingFaceHubWrapper: def __init__(self, repo_id, model_kwargs, huggingfacehub_api_token): self.model = HuggingFaceHub( repo_id=repo_id, model_kwargs=model_kwargs, huggingfacehub_api_token=huggingfacehub_api_token ) self.temperature = model_kwargs.get('temperature', 0.7) self.top_p = model_kwargs.get('top_p', 0.95) self.repetition_penalty = model_kwargs.get('repetition_penalty', 1.1) self.top_k = model_kwargs.get('top_k', 50) self.max_tokens = model_kwargs.get('max_length', 1000) self.max_tokens_per_summary = model_kwargs.get('max_tokens_per_summary', 2048) def get_provider_default_settings(self): return CustomLLMSettings( structured_output=LlmStructuredOutputSettings( output_type=LlmStructuredOutputType.no_structured_output, include_system_prompt=False, include_user_prompt=False, include_assistant_prompt=False, ), temperature=self.temperature, top_p=self.top_p, repetition_penalty=self.repetition_penalty, top_k=self.top_k, max_tokens=self.max_tokens, max_tokens_per_summary=self.max_tokens_per_summary ) def get_provider_identifier(self): return "HuggingFaceHub" def __call__(self, *args, **kwargs): return self.model(*args, **kwargs) def get_num_tokens(self, text): # This is a placeholder. You might need to implement a proper token counting method return len(text.split()) def get_max_tokens(self): # This is a placeholder. Return the actual max tokens for your model return 2048 # Utility functions def get_server_time(): utc_time = datetime.now(timezone.utc) return utc_time.strftime("%Y-%m-%d %H:%M:%S") def get_website_content_from_url(url: str) -> str: try: downloaded = fetch_url(url) result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', url=url) if result: result = json.loads(result) return f'=========== Website Title: {result["title"]} ===========\n\n=========== Website URL: {url} ===========\n\n=========== Website Content ===========\n\n{result["raw_text"]}\n\n=========== Website Content End ===========\n\n' else: return "" except Exception as e: return f"An error occurred: {str(e)}" class CitingSources(BaseModel): sources: List[str] = Field( ..., description="List of sources to cite. Should be an URL of the source. E.g. GitHub URL, Blogpost URL or Newsletter URL." ) # Model function def get_model(temperature, top_p, repetition_penalty, top_k=50, max_tokens=1000, max_tokens_per_summary=2048): return HuggingFaceHubWrapper( repo_id="mistralai/Mistral-7B-Instruct-v0.3", model_kwargs={ "temperature": temperature, "top_p": top_p, "repetition_penalty": repetition_penalty, "top_k": top_k, "max_length": max_tokens, "max_tokens_per_summary": max_tokens_per_summary }, huggingfacehub_api_token=huggingface_token ) def get_messages_formatter_type(model_name): model_name = model_name.lower() if any(keyword in model_name for keyword in ["meta", "aya"]): return MessagesFormatterType.LLAMA_3 elif any(keyword in model_name for keyword in ["mistral", "mixtral"]): return MessagesFormatterType.MISTRAL elif any(keyword in model_name for keyword in ["einstein", "dolphin"]): return MessagesFormatterType.CHATML elif "phi" in model_name: return MessagesFormatterType.PHI_3 else: return MessagesFormatterType.CHATML # Main response function def respond( message, history: list[tuple[str, str]], model, system_message, max_tokens, temperature, top_p, top_k, repeat_penalty, ): global llm global llm_model chat_template = get_messages_formatter_type(model) if llm is None or llm_model != model: llm = Llama( model_path=f"models/{model}", flash_attn=True, n_gpu_layers=81, n_batch=1024, n_ctx=get_context_by_model(model), ) llm_model = model provider = LlamaCppPythonProvider(llm) logging.info(f"Loaded chat examples: {chat_template}") search_tool = WebSearchTool( llm_provider=provider, message_formatter_type=chat_template, max_tokens_search_results=12000, max_tokens_per_summary=2048, ) web_search_agent = LlamaCppAgent( provider, system_prompt=web_search_system_prompt, predefined_messages_formatter_type=chat_template, debug_output=True, ) answer_agent = LlamaCppAgent( provider, system_prompt=research_system_prompt, predefined_messages_formatter_type=chat_template, debug_output=True, ) settings = provider.get_provider_default_settings() settings.stream = False settings.temperature = temperature settings.top_k = top_k settings.top_p = top_p settings.max_tokens = max_tokens settings.repeat_penalty = repeat_penalty output_settings = LlmStructuredOutputSettings.from_functions( [search_tool.get_tool()] ) messages = BasicChatHistory() for msn in history: user = {"role": Roles.user, "content": msn[0]} assistant = {"role": Roles.assistant, "content": msn[1]} messages.add_message(user) messages.add_message(assistant) result = web_search_agent.get_chat_response( message, llm_sampling_settings=settings, structured_output_settings=output_settings, add_message_to_chat_history=False, add_response_to_chat_history=False, print_output=False, ) outputs = "" settings.stream = True response_text = answer_agent.get_chat_response( f"Write a detailed and complete research document that fulfills the following user request: '{message}', based on the information from the web below.\n\n" + result[0]["return_value"], role=Roles.tool, llm_sampling_settings=settings, chat_history=messages, returns_streaming_generator=True, print_output=False, ) for text in response_text: outputs += text yield outputs output_settings = LlmStructuredOutputSettings.from_pydantic_models( [CitingSources], LlmStructuredOutputType.object_instance ) citing_sources = answer_agent.get_chat_response( "Cite the sources you used in your response.", role=Roles.tool, llm_sampling_settings=settings, chat_history=messages, returns_streaming_generator=False, structured_output_settings=output_settings, print_output=False, ) outputs += "\n\nSources:\n" outputs += "\n".join(citing_sources.sources) yield outputs # Gradio interface demo = gr.ChatInterface( respond, additional_inputs=[ gr.Textbox(value=web_search_system_prompt, label="System message"), gr.Slider(minimum=1, maximum=1000, value=1000, step=1, label="Max tokens"), gr.Slider(minimum=0.1, maximum=1.0, value=0.7, step=0.1, label="Temperature"), gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"), gr.Slider(minimum=0.0, maximum=2.0, value=1.1, step=0.1, label="Repetition penalty"), gr.Slider(minimum=1, maximum=100, value=50, step=1, label="Top-k"), gr.Slider(minimum=1, maximum=4096, value=2048, step=1, label="Max tokens per summary"), ], theme=gr.themes.Soft( primary_hue="orange", secondary_hue="amber", neutral_hue="gray", font=[gr.themes.GoogleFont("Exo"), "ui-sans-serif", "system-ui", "sans-serif"]).set( body_background_fill_dark="#0c0505", block_background_fill_dark="#0c0505", block_border_width="1px", block_title_background_fill_dark="#1b0f0f", input_background_fill_dark="#140b0b", button_secondary_background_fill_dark="#140b0b", border_color_accent_dark="#1b0f0f", border_color_primary_dark="#1b0f0f", background_fill_secondary_dark="#0c0505", color_accent_soft_dark="transparent", code_background_fill_dark="#140b0b" ), css=css, retry_btn="Retry", undo_btn="Undo", clear_btn="Clear", submit_btn="Send", cache_examples=False, examples=examples, description="Mistral-7B: Chat with DuckDuckGo Agent", analytics_enabled=False, chatbot=gr.Chatbot( scale=1, placeholder=PLACEHOLDER, show_copy_button=True ) ) if __name__ == "__main__": demo.launch()