Spaces:
Sleeping
Sleeping
| import os | |
| import logging | |
| import gradio as gr | |
| from transformers import pipeline | |
| from llama_cpp_agent.providers import LlamaCppPythonProvider | |
| from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType | |
| from llama_cpp_agent.chat_history import BasicChatHistory | |
| from llama_cpp_agent.chat_history.messages import Roles | |
| from llama_cpp_agent.llm_output_settings import ( | |
| LlmStructuredOutputSettings, | |
| LlmStructuredOutputType, | |
| ) | |
| from llama_cpp_agent.tools import WebSearchTool | |
| from llama_cpp_agent.prompt_templates import web_search_system_prompt, research_system_prompt | |
| from pydantic import BaseModel, Field | |
| from trafilatura import fetch_url, extract | |
| import json | |
| from datetime import datetime, timezone | |
| from typing import List | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.llms import HuggingFaceHub | |
| llm = None | |
| llm_model = None | |
| huggingface_token = os.environ.get("HUGGINGFACE_TOKEN") | |
| examples = [ | |
| ["latest news about Yann LeCun"], | |
| ["Latest news site:github.blog"], | |
| ["Where I can find best hotel in Galapagos, Ecuador intitle:hotel"], | |
| ["filetype:pdf intitle:python"] | |
| ] | |
| def get_context_by_model(model_name): | |
| model_context_limits = { | |
| "Mistral-7B-Instruct-v0.3": 32768, | |
| } | |
| return model_context_limits.get(model_name, None) | |
| def get_messages_formatter_type(model_name): | |
| model_name = model_name.lower() | |
| if "mistral" in model_name: | |
| return MessagesFormatterType.MISTRAL | |
| else: | |
| return MessagesFormatterType.CHATML | |
| def get_model(temperature, top_p, repetition_penalty): | |
| return HuggingFaceHub( | |
| repo_id="mistralai/Mistral-7B-Instruct-v0.3", | |
| model_kwargs={ | |
| "temperature": temperature, | |
| "top_p": top_p, | |
| "repetition_penalty": repetition_penalty, | |
| "max_length": 1000 | |
| }, | |
| huggingfacehub_api_token=huggingface_token | |
| ) | |
| def get_server_time(): | |
| utc_time = datetime.now(timezone.utc) | |
| return utc_time.strftime("%Y-%m-%d %H:%M:%S") | |
| def get_website_content_from_url(url: str) -> str: | |
| try: | |
| downloaded = fetch_url(url) | |
| result = extract(downloaded, include_formatting=True, include_links=True, output_format='json', url=url) | |
| if result: | |
| result = json.loads(result) | |
| return f'=========== Website Title: {result["title"]} ===========\n\n=========== Website URL: {url} ===========\n\n=========== Website Content ===========\n\n{result["raw_text"]}\n\n=========== Website Content End ===========\n\n' | |
| else: | |
| return "" | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}" | |
| class CitingSources(BaseModel): | |
| sources: List[str] = Field( | |
| ..., | |
| description="List of sources to cite. Should be an URL of the source. E.g. GitHub URL, Blogpost URL or Newsletter URL." | |
| ) | |
| def write_message_to_user(): | |
| return "Please write the message to the user." | |
| #@spaces.GPU(duration=120) | |
| def respond( | |
| message, | |
| history: list[tuple[str, str]], | |
| model, | |
| system_message, | |
| max_tokens, | |
| temperature, | |
| top_p, | |
| top_k, | |
| repeat_penalty, | |
| ): | |
| global llm | |
| global llm_model | |
| chat_template = get_messages_formatter_type(model) | |
| if llm is None or llm_model != model: | |
| llm = get_model(temperature, top_p, repeat_penalty) | |
| llm_model = model | |
| provider = LlamaCppPythonProvider(llm) | |
| logging.info(f"Loaded chat examples: {chat_template}") | |
| search_tool = WebSearchTool( | |
| llm_provider=provider, | |
| message_formatter_type=chat_template, | |
| max_tokens_search_results=12000, | |
| max_tokens_per_summary=2048, | |
| ) | |
| web_search_agent = LlamaCppAgent( | |
| provider, | |
| system_prompt=web_search_system_prompt, | |
| predefined_messages_formatter_type=chat_template, | |
| debug_output=True, | |
| ) | |
| answer_agent = LlamaCppAgent( | |
| provider, | |
| system_prompt=research_system_prompt, | |
| predefined_messages_formatter_type=chat_template, | |
| debug_output=True, | |
| ) | |
| settings = provider.get_provider_default_settings() | |
| settings.stream = False | |
| settings.temperature = temperature | |
| settings.top_k = top_k | |
| settings.top_p = top_p | |
| settings.max_tokens = max_tokens | |
| settings.repeat_penalty = repeat_penalty | |
| output_settings = LlmStructuredOutputSettings.from_functions( | |
| [search_tool.get_tool()] | |
| ) | |
| messages = BasicChatHistory() | |
| for msn in history: | |
| user = {"role": Roles.user, "content": msn[0]} | |
| assistant = {"role": Roles.assistant, "content": msn[1]} | |
| messages.add_message(user) | |
| messages.add_message(assistant) | |
| result = web_search_agent.get_chat_response( | |
| message, | |
| llm_sampling_settings=settings, | |
| structured_output_settings=output_settings, | |
| add_message_to_chat_history=False, | |
| add_response_to_chat_history=False, | |
| print_output=False, | |
| ) | |
| outputs = "" | |
| settings.stream = True | |
| response_text = answer_agent.get_chat_response( | |
| f"Write a detailed and complete research document that fulfills the following user request: '{message}', based on the information from the web below.\n\n" + | |
| result[0]["return_value"], | |
| role=Roles.tool, | |
| llm_sampling_settings=settings, | |
| chat_history=messages, | |
| returns_streaming_generator=True, | |
| print_output=False, | |
| ) | |
| for text in response_text: | |
| outputs += text | |
| yield outputs | |
| output_settings = LlmStructuredOutputSettings.from_pydantic_models( | |
| [CitingSources], LlmStructuredOutputType.object_instance | |
| ) | |
| citing_sources = answer_agent.get_chat_response( | |
| "Cite the sources you used in your response.", | |
| role=Roles.tool, | |
| llm_sampling_settings=settings, | |
| chat_history=messages, | |
| returns_streaming_generator=False, | |
| structured_output_settings=output_settings, | |
| print_output=False, | |
| ) | |
| outputs += "\n\nSources:\n" | |
| outputs += "\n".join(citing_sources.sources) | |
| yield outputs | |
| demo = gr.ChatInterface( | |
| respond, | |
| additional_inputs=[ | |
| gr.Dropdown([ | |
| 'Mistral-7B-Instruct-v0.3' | |
| ], | |
| value="Mistral-7B-Instruct-v0.3", | |
| label="Model" | |
| ), | |
| gr.Textbox(value=web_search_system_prompt, label="System message"), | |
| gr.Slider(minimum=1, maximum=4096, value=2048, step=1, label="Max tokens"), | |
| gr.Slider(minimum=0.1, maximum=1.0, value=0.45, step=0.1, label="Temperature"), | |
| gr.Slider( | |
| minimum=0.1, | |
| maximum=1.0, | |
| value=0.95, | |
| step=0.05, | |
| label="Top-p", | |
| ), | |
| gr.Slider( | |
| minimum=0, | |
| maximum=100, | |
| value=40, | |
| step=1, | |
| label="Top-k", | |
| ), | |
| gr.Slider( | |
| minimum=0.0, | |
| maximum=2.0, | |
| value=1.1, | |
| step=0.1, | |
| label="Repetition penalty", | |
| ), | |
| ], | |
| theme=gr.themes.Soft( | |
| primary_hue="orange", | |
| secondary_hue="amber", | |
| neutral_hue="gray", | |
| font=[gr.themes.GoogleFont("Exo"), "ui-sans-serif", "system-ui", "sans-serif"]).set( | |
| body_background_fill_dark="#0c0505", | |
| block_background_fill_dark="#0c0505", | |
| block_border_width="1px", | |
| block_title_background_fill_dark="#1b0f0f", | |
| input_background_fill_dark="#140b0b", | |
| button_secondary_background_fill_dark="#140b0b", | |
| border_color_accent_dark="#1b0f0f", | |
| border_color_primary_dark="#1b0f0f", | |
| slider_color="#ff911a", | |
| button_primary_background_fill="#ff911a", | |
| button_primary_background_fill_dark="#ff911a", | |
| button_primary_text_color="#f9f9f9", | |
| button_primary_text_color_dark="#f9f9f9" | |
| ), | |
| examples=examples, | |
| title="llama.cpp agent", | |
| ) | |
| demo.queue().launch() |