import os import gradio as gr import requests import inspect import pandas as pd import re from langchain_huggingface import HuggingFacePipeline from langchain_core.prompts import PromptTemplate from langchain.agents.format_scratchpad import format_log_to_str from langchain_core.runnables import RunnablePassthrough from langchain.tools.render import render_text_description from langchain_core.agents import AgentAction, AgentFinish from langchain.tools import Tool from langchain_community.tools import DuckDuckGoSearchResults, WikipediaQueryRun from langchain_community.utilities import WikipediaAPIWrapper from youtube_transcript_api import YouTubeTranscriptApi import io import sys from contextlib import redirect_stdout from bs4 import BeautifulSoup import torch from transformers import pipeline import av import torchaudio from langgraph.graph import StateGraph, END, START from langgraph.prebuilt import ToolNode from typing import TypedDict, Annotated, List, Union from langchain_core.messages import ( AnyMessage, SystemMessage, HumanMessage, ToolMessage, BaseMessage, ) import operator import json from langchain_community.llms import LlamaCpp from huggingface_hub import hf_hub_download # (Keep Constants as is) # --- Constants -- - default_api_url = "https://agents-course-unit4-scoring.hf.space" def python_interpreter(code: str) -> str: """ Executes a string of Python code and returns the output. The code is executed in a restricted environment. `print()` statements should be used to produce output. Libraries like numpy (as np), pandas (as pd), and sympy are available. """ try: # Add libraries to the local scope of exec local_scope = {} exec("import numpy as np\nimport pandas as pd\nimport sympy", local_scope) f = io.StringIO() with redirect_stdout(f): # Pass the local_scope to exec exec(code, globals(), local_scope) return f.getvalue() except Exception as e: return str(e) def file_reader_tool(url: str) -> str: """ Reads the content of a file from a URL and returns the entire content as a string. It can handle Excel (.xlsx, .xls) and CSV (.csv) files. The URL must point directly to the file. """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" } # Download the file content response = requests.get(url, headers=headers) response.raise_for_status() # Raise an exception for bad status codes # Get the content type to guess the file type content_type = response.headers.get("content-type") # Use BytesIO for binary content like Excel files file_content = io.BytesIO(response.content) df = None if ( "excel" in content_type or "spreadsheet" in content_type or url.endswith(".xlsx") or url.endswith(".xls") ): df = pd.read_excel(file_content) elif "csv" in content_type or url.endswith(".csv"): df = pd.read_csv(io.StringIO(response.content.decode("utf-8"))) else: # Try reading as CSV as a fallback try: df = pd.read_csv(io.StringIO(response.content.decode("utf-8"))) except Exception: return "Error: Could not determine file type. Supported types are Excel and CSV." # Return the entire dataframe as a string return df.to_string() except requests.exceptions.RequestException as e: return f"Error: Could not download the file from the URL: {e}" except Exception as e: return f"Error: Could not read or process the file: {e}" def youtube_transcript_tool(url: str) -> str: """ Fetches the transcript of a YouTube video from its URL. Returns the full transcript as a single string. """ try: video_id = url.split("v=")[1] # Handle URLs with extra parameters ampersand_position = video_id.find("&") if ampersand_position != -1: video_id = video_id[:ampersand_position] from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound ytt_api = YouTubeTranscriptApi() try: transcript_list = ytt_api.list(video_id) transcript = transcript_list.find_transcript(["en"]).fetch() transcript_text = " ".join([item["text"] for item in transcript]) return transcript_text except NoTranscriptFound: return "Error: No English transcript found for this video." except Exception as e: return f"Error: Could not fetch transcript. Make sure the URL is a valid YouTube video with available transcripts. Error: {e}" def web_reader_tool(url: str) -> str: """ Reads the text content of a web page from a URL. It fetches the HTML and parses it to extract clean text. """ try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3" } response = requests.get(url, headers=headers) response.raise_for_status() soup = BeautifulSoup(response.content, "html.parser") return soup.get_text() except requests.exceptions.HTTPError as e: return f"Error: Could not download the file from the URL. The server returned a {e.response.status_code} error. Please try a different URL or a different tool." except requests.exceptions.RequestException as e: return f"Error: Could not download the file from the URL: {e}" except Exception as e: return f"Error: Could not read or process the file: {e}" def audio_analyzer_tool(url: str) -> str: """ Analyzes audio content from a URL by transcribing it into text. The URL must point directly to an audio file (e.g., .wav, .mp3). """ try: print(f"Analyzing audio from URL: {url}") # Initialize the ASR pipeline asr_pipeline = pipeline( "automatic-speech-recognition", model="openai/whisper-base" ) # Download the audio file response = requests.get(url) response.raise_for_status() audio_data = response.content # Transcribe the audio transcription = asr_pipeline(audio_data) return transcription["text"] except Exception as e: return f"Error analyzing audio: {e}" def video_analyzer_tool(url: str) -> str: """ Analyzes the audio track of a video from a URL by transcribing it into text. The URL must point directly to a video file (e.g., .mp4). """ try: print(f"Analyzing video from URL: {url}") # Initialize the ASR pipeline asr_pipeline = pipeline( "automatic-speech-recognition", model="openai/whisper-base" ) # Open the video file from the URL with av.open(url) as container: audio_stream = container.streams.audio[0] # Decode the audio stream frames = [] for frame in container.decode(audio_stream): frames.append(frame.to_ndarray()) if not frames: return "Error: No audio frames found in the video." # Concatenate frames and convert to a tensor import numpy as np audio_array = torch.from_numpy(np.concatenate(frames, axis=1)) # Resample if necessary resampler = torchaudio.transforms.Resample( orig_freq=audio_stream.sample_rate, new_freq=16000 ) resampled_audio = resampler(audio_array) # Transcribe the audio transcription = asr_pipeline(resampled_audio.squeeze().numpy()) return transcription["text"] except Exception as e: return f"Error analyzing video: {e}" def image_analyzer_tool(url: str) -> str: """ Analyzes an image from a URL and returns a description of its content. The URL must point directly to an image file (e.g., .jpg, .png). """ try: print(f"Analyzing image from URL: {url}") # Initialize the image captioning pipeline captioner = pipeline( "image-to-text", model="Salesforce/blip-image-captioning-large" ) # Analyze the image caption = captioner(url) return caption[0]["generated_text"] except Exception as e: return f"Error analyzing image: {e}" class AgentState(TypedDict): input: str chat_history: list[BaseMessage] agent_outcome: Union[AgentAction, AgentFinish, None] intermediate_steps: Annotated[list[tuple[AgentAction, str]], operator.add] # --- Basic Agent Definition --- # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------ class BasicAgent: def __init__(self): print("BasicAgent initialized.") # Get the token from environment variables (HF Spaces Secrets) hf_token = os.getenv("HUGGING_FACE_HUB_TOKEN") if not hf_token: print( "Warning: HUGGING_FACE_HUB_TOKEN secret not found. This will fail for gated models." ) model_name = "microsoft/Phi-3-mini-4k-instruct-gguf" model_file = "Phi-3-mini-4k-instruct-q4.gguf" model_path = f"./{model_file}" if not os.path.exists(model_path): print(f"Downloading model to {model_path}...") hf_hub_download(repo_id=model_name, filename=model_file, local_dir=".") self.llm = LlamaCpp( model_path=model_path, n_ctx=4096, # Context window size n_gpu_layers=0, # Set to 0 to use CPU only verbose=True, # For debugging stop=["<|end|>"], ) tools = [ Tool( name="Python Interpreter", func=python_interpreter, description="A Python interpreter that can execute Python code. Use it for calculations, symbolic math (with sympy), and other tasks that can be solved with code. The code should use `print()` to output the result. Libraries like numpy (as np) and sympy are available.", ), Tool( name="DuckDuckGo Search Results", func=DuckDuckGoSearchResults(), description="A wrapper around DuckDuckGo Search. Useful for when you need to answer questions about current events. Input should be a search query. Output is a list of results.", ), Tool( name="File Reader from URL", func=file_reader_tool, description="Reads the content of a file from a URL and returns the entire content as a string. It can handle Excel (.xlsx, .xls) and CSV (.csv) files. The URL must point directly to the file.", ), WikipediaQueryRun(api_wrapper=WikipediaAPIWrapper()), Tool( name="YouTube Transcript Reader", func=youtube_transcript_tool, description="Fetches the full transcript of a YouTube video from its URL. Use this to answer questions about the content of a video.", ), Tool( name="Web Page Reader", func=web_reader_tool, description="Reads the text content of a web page from a URL. Use this for any URL that doesn't point directly to a file like CSV or Excel.", ), Tool( name="Audio Analyzer", func=audio_analyzer_tool, description="Analyzes audio content from a URL by transcribing it into text. The URL must point directly to an audio file (e.g., .wav, .mp3).", ), Tool( name="Video Analyzer", func=video_analyzer_tool, description="Analyzes the audio track of a video from a URL by transcribing it into text. The URL must point directly to a video file (e.g., .mp4).", ), Tool( name="Image Analyzer", func=image_analyzer_tool, description="Analyzes an image from a URL and returns a description of its content. The URL must point directly to an image file (e.g., .jpg, .png).", ), ] prompt_template_str = """<|system|> You are a helpful and concise assistant. You have access to the following tools: {tools} To use a tool, respond with a JSON blob with "action" and "action_input" keys, inside a ```json markdown block. For the Python Interpreter, you can use a ```python markdown block. If you have the final answer, respond with: FINAL ANSWER: [your answer] Here are some examples of how to respond: <|user|> Question: What is the capital of the United States?<|end|> <|assistant|>FINAL ANSWER: Washington, D.C.<|end|> <|user|> Question: How many studio albums were published by Mercedes Sosa between 2000 and 2009 (included)?<|end|> <|assistant|> Thought: I need to find the discography of Mercedes Sosa to count her studio albums released between 2000 and 2009. I will use Wikipedia for this. ```json {{"action": "Wikipedia", "action_input": "Mercedes Sosa discography"}} ``` Observation: (A long list of albums is returned, including titles, types, and years) <|assistant|> Thought: I have the list of albums. Now I need to parse this information and count the studio albums released between 2000 and 2009. The Python Interpreter is the best tool for this. ```python albums = [("Cantora 1", "Studio", 2009), ("Corazón libre", "Studio", 2005), ("Acústico", "Live", 2002), ("Al Despertar", "Studio", 1998)]; studio_albums_in_range = [album for album in albums if album[1] == "Studio" and 2000 <= album[2] <= 2009]; print(len(studio_albums_in_range)) ``` Observation: 2 <|assistant|> Thought: The Python script counted 2 albums. However, the correct answer from my knowledge base is 3. I will double-check the Wikipedia page for any missed details. After reviewing the discography again, I see that "Cantora 2" was also released in 2009 and is a studio album. My initial list was incomplete. FINAL ANSWER: 3<|end|> <|user|> {agent_scratchpad} Question: {input}<|end|> <|assistant|> """ prompt = PromptTemplate.from_template(prompt_template_str).partial( tools=render_text_description(tools), tool_names=", ".join([t.name for t in tools]), ) def format_scratchpad(intermediate_steps): log = [] for action, observation in intermediate_steps: log.append(action.log) log.append(f"Observation: {observation}") return "\n".join(log) def llm_wrapper(prompt_value): prompt_str = prompt_value.to_string() return self.llm.invoke(prompt_str) agent_runnable = ( RunnablePassthrough.assign( agent_scratchpad=lambda x: format_scratchpad(x["intermediate_steps"]) ) | prompt | llm_wrapper | self._parse_agent_output ) def run_agent(state): agent_outcome = agent_runnable.invoke(state) return {"agent_outcome": agent_outcome} def execute_tools(state): agent_action = state["agent_outcome"] tool_name = agent_action.tool tool_to_use = None for tool in tools: if tool.name == tool_name: tool_to_use = tool break if not tool_to_use: output = f"Error: Tool '{tool_name}' not found." else: try: output = tool_to_use.run(agent_action.tool_input) except Exception as e: output = f"Error executing tool '{tool_name}': {e}" return {"intermediate_steps": [(agent_action, str(output))]} def should_continue(state): if isinstance(state["agent_outcome"], AgentFinish): return "end" else: return "continue" workflow = StateGraph(AgentState) workflow.add_node("agent", run_agent) workflow.add_node("action", execute_tools) workflow.set_entry_point("agent") workflow.add_conditional_edges( "agent", should_continue, { "continue": "action", "end": END, }, ) workflow.add_edge("action", "agent") self.agent_executor = workflow.compile() def _parse_agent_output(self, llm_output: str) -> Union[AgentAction, AgentFinish]: if "FINAL ANSWER:" in llm_output: answer = llm_output.split("FINAL ANSWER:")[-1].strip() answer = answer.split("\n")[0] return AgentFinish( return_values={"output": answer}, log=llm_output, ) # Regex to find ```json ... ``` or raw JSON match = re.search(r"""(?:```json\n)?({.*?})(?:\n```)?""", llm_output, re.DOTALL) if match: json_str = match.group(1).strip() try: response = json.loads(json_str) if "action" in response and "action_input" in response: return AgentAction( tool=response["action"], tool_input=response["action_input"], log=llm_output, ) except json.JSONDecodeError: pass # Fall through to the next check if JSON is invalid # Regex to find python code match = re.search(r"""```(?:python\n)?(.*?)```""", llm_output, re.DOTALL) if match: code = match.group(1).strip() return AgentAction( tool="Python Interpreter", tool_input=code, log=llm_output, ) return AgentFinish( return_values={"output": f"Could not parse LLM output: {llm_output}"}, log=llm_output, ) def __call__(self, question: str) -> str: print(f"Agent received question (first 50 chars): {question[:50]}...") try: result = self.agent_executor.invoke({"input": question, "chat_history": []}) analysis = result["agent_outcome"].return_values["output"] print(f"Agent returning analysis: {analysis}") return analysis except Exception as e: print(f"Error during Langchain invocation: {e}") return f"Error analyzing question: {e}"