import base64 import io import json import os import subprocess from email.message import Message from io import StringIO from pathlib import Path from typing import List import av import pandas as pd import requests import yt_dlp from bs4 import BeautifulSoup from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage from langchain_core.tools import tool from langchain_openai import ChatOpenAI from langchain_tavily import TavilyExtract, TavilySearch from pydantic import SecretStr TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "") OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "") YOUTUBE_FRAME_ASSESSMENT_MODEL = os.getenv("YOUTUBE_FRAME_ASSESSMENT_MODEL", "google/gemini-2.5-flash-preview-05-20") YOUTUBE_CONFIRMATION_MODEL = os.getenv("YOUTUBE_CONFIRMATION_MODEL", "google/gemini-2.5-flash-preview-05-20") # Define Tools for the Agent @tool(parse_docstring=True, error_on_invalid_docstring=False) def download_file_from_url(url: str, filename_override: str | None = None) -> str: """ Downloads a file from a URL to a directory in the cwd. Prefer to use the filename associated with the URL, but can override if directed to. Filename Logic: 1. If `filename_override` is provided, it is used directly. 2. Otherwise, the filename is extracted from the 'Content-Disposition' HTTP header using Python's `email.message.Message` parser. The result is sanitized. 3. If no filename is provided via override and none can be determined from the header, a ValueError is raised. Args: url: The URL of the file to download. filename_override: Optional. If provided, this exact name is used for the downloaded file. Using the name associated with the URL is recommended (but may require identifying the extension). Returns: The full path to the downloaded file. Raises: requests.exceptions.RequestException: For HTTP errors (e.g., 404, network issues). IOError: If the file cannot be written. ValueError: If no filename can be determined (neither provided via override nor found in Content-Disposition header). """ try: with requests.Session() as session: with session.get(url, stream=True, allow_redirects=True, timeout=30) as response: response.raise_for_status() final_filename = None if filename_override: final_filename = filename_override print(f"Using provided filename: {final_filename}") else: content_disposition = response.headers.get('content-disposition') if content_disposition: msg = Message() msg['Content-Disposition'] = content_disposition filename_from_header = msg.get_filename() # Handles various encodings if filename_from_header: # Sanitize by taking only the basename to prevent path traversal final_filename = os.path.basename(filename_from_header) print(f"Using filename from Content-Disposition: {final_filename}") if not final_filename: raise ValueError( "No filename could be determined. " "None was provided as an override, and it could not be " "extracted from the Content-Disposition header." ) current_dir = Path.cwd() temp_dir = current_dir / "temp_downloads" temp_dir.mkdir(parents=True, exist_ok=True) local_filepath = os.path.join(temp_dir, final_filename) with open(local_filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) #print(f"File downloaded to: {local_filepath}") return_str = f"File downloaded successfully. Local File Path: {local_filepath}" return return_str except requests.exceptions.RequestException as e: print(f"Error during download from {url}: {e}") raise except IOError as e: print(f"Error writing file: {e}") raise # ValueError will propagate if raised @tool(parse_docstring=True, error_on_invalid_docstring=False) def basic_web_search(query: str, search_domains: list[str] | None = None) -> str: """ Perform a web search using Tavily. Useful for retrieving relevant URLs and content summaries based on a search query. The content returned by this tool is limited. For more detailed content extraction, use the `extract_url_content` tool. If you would like to limit the search to specific domains, you can pass a comma-separated string of domains (['wikipedia.org', 'example.com']). Args: query (str): The search query to perform. search_domains (None | list[str]): Optional. A list of domains (E.g., ['wikipedia.org', 'example.com']) to restrict the search to. If None, searches across all domains. Returns: str: a json formatted string containing the search results, including titles, content snippets, and URLs. """ search_tool = TavilySearch( api_key=SecretStr(TAVILY_API_KEY), max_results=5, include_raw_content=False, #include_answer=True, include_domains=search_domains ) results = search_tool.invoke({"query": query}) if results and isinstance(results, dict) and len(results["results"]) > 0: return_dict = { #"answer": "The following is an unconfirmed answer. Confirm it by extracting cotent from a url." + results.get("answer", ""), "results": [] } for result in results["results"]: if "title" in result and "content" in result and result['score'] > 0.25: # Filter results based on score return_dict["results"].append({ "title": result["title"], "url": result["url"], "content": result["content"], }) if len(return_dict["results"]) == 0: return "No results found. If the query is too specific, try a more general search term." return json.dumps(return_dict, indent=2) else: return "No results found. If the query is too specific, try a more general search term." @tool(parse_docstring=True, error_on_invalid_docstring=False) def extract_url_content(url_list: list[str]) -> str: """ Extracts the content from URLs using Tavily's extract tool. This tool is useful for retrieving content from web pages. This tool will most likely be used after a web search to extract content from the URLs returned by the search. Args: url_list (list[str]): The URLs to extract content from. Returns: str: The extracted content or an error message if extraction fails. """ extract_tool = TavilyExtract(api_key=SecretStr(TAVILY_API_KEY)) extract_results = extract_tool.invoke({'urls': url_list}) if extract_results and 'results' in extract_results and len(extract_results['results']) > 0: for i, page_content in enumerate(extract_results['results']): del extract_results['results'][i]['images'] # if len(page_content['raw_content']) > 40000: # extract_results['results'][i]['raw_content'] = page_content['raw_content'][:40000] + '... [truncated]' return json.dumps(extract_results['results'], indent=2) else: return f"No content could be extracted from the provided URLs: {url_list}" def bs_html_parser(url): response = requests.get(url) # Send a GET request to the URL # Check if the request was successful if response.status_code == 200: return BeautifulSoup(response.text, "html.parser") # Parse and return the HTML else: return None # Return None if the request fails def get_table_title(table_tag): """ Extracts a title for a given table tag. It looks for a , then for the closest preceding

-

tag. """ title = "Untitled Table" # 1. Check for a element within the table caption = table_tag.find('caption') if caption: caption_text = caption.get_text(strip=True) if caption_text: # Ensure caption is not empty and use it return caption_text # 2. If no caption, look for the closest preceding heading tag (h1-h6) headings = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6'] # find_all_previous gets all previous tags matching criteria, in reverse document order. # limit=1 gets the closest one (the last one encountered before the table). preceding_headings = table_tag.find_all_previous(headings, limit=1) if preceding_headings: heading_tag = preceding_headings[0] # To get the cleanest text, prefer 'mw-headline' if it exists, # otherwise, clone the heading, remove edit sections, and then get text. # Try to find a specific 'mw-headline' span first (common in Wikipedia) headline_span = heading_tag.find("span", class_="mw-headline") if headline_span: title_text = headline_span.get_text(strip=True) else: # Fallback: create a temporary copy of the heading tag to modify it # without affecting the main soup. temp_heading_soup = BeautifulSoup(str(heading_tag), 'html.parser') temp_heading_tag = temp_heading_soup.find(heading_tag.name) if temp_heading_tag: # Remove "edit" links (span with class "mw-editsection") for span in temp_heading_tag.find_all("span", class_="mw-editsection"): span.decompose() title_text = temp_heading_tag.get_text(strip=True) else: # If cloning somehow failed, take raw text (less ideal) title_text = heading_tag.get_text(strip=True) if title_text: # Ensure title_text is not empty title = title_text return title @tool(parse_docstring=True, error_on_invalid_docstring=False) def wikipedia_reader(url: str) -> str: """ Extracts sections, paragraphs, and tables from a Wikipedia page. Args: url (str): The URL of the Wikipedia page to extract content from. Returns: str: A JSON string containing sections, paragraphs, and tables. """ soup = bs_html_parser(url) if not soup: return "" # Return empty if soup creation failed def extract_links(soup_obj): links = [] for link in soup_obj.find_all('a', href=True): href = link.get('href') # Filter for internal page links (sections) if href and href.startswith("#") and "#cite_" not in href and len(href) > 1: links.append(url+href) # Original logic for other links starting with the base URL (might need adjustment based on desired links) # elif href and href.startswith(url): # links.append(href) return links links = extract_links(soup) def extract_paragraphs(soup_obj): paragraphs_text = [p.get_text(strip=True) for p in soup_obj.find_all("p")] return [p for p in paragraphs_text if p and len(p) > 10] paragraphs = extract_paragraphs(soup) def extract_tables(soup_obj): tables_with_titles = [] for table_tag in soup_obj.find_all("table", {"class": "wikitable"}): title = get_table_title(table_tag) # Get the title try: # Pandas read_html expects a string or file-like object table_html_str = str(table_tag) # Using StringIO to simulate a file, as read_html can be sensitive df_list = pd.read_html(StringIO(table_html_str)) if df_list: df = df_list[0] # read_html returns a list of DataFrames tables_with_titles.append({"title": title, "table_data": df.to_dict(orient='records')}) else: tables_with_titles.append({"title": title, "table_data": None, "error": "pd.read_html returned empty list"}) except Exception as e: tables_with_titles.append({"title": title, "table_data" : None, "error": str(e)}) return tables_with_titles tables = extract_tables(soup) # This now returns a list of dicts return_dict = { "sections": links, "paragraphs": paragraphs, "tables": tables } return json.dumps(return_dict, indent=2, ensure_ascii=False) # Return as JSON string # Singleton class for Whisper model # we use this so we don't have to load the model multiple times, just once the first time the tool is used class WhisperTranscriber: _instance = None def __new__(cls): if cls._instance is None: import torch from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor from transformers.pipelines import pipeline device = "cuda:0" if torch.cuda.is_available() else "cpu" torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 model_id = "openai/whisper-large-v3" model = AutoModelForSpeechSeq2Seq.from_pretrained( model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True ) model.to(device) processor = AutoProcessor.from_pretrained(model_id) pipe = pipeline( "automatic-speech-recognition", model=model, tokenizer=processor.tokenizer, feature_extractor=processor.feature_extractor, torch_dtype=torch_dtype, device=device, ) cls._instance = pipe return cls._instance @tool(parse_docstring=True, error_on_invalid_docstring=False) def transcribe_audio_file(file_path: str) -> str: """ Transcribes an audio file to text using OpenAI's Whisper-large-v3 model, caching the model after the first load. Args: file_path (str): The path to the audio file to transcribe. Returns: str: The transcription of the audio file. """ pipe = WhisperTranscriber() transcription = pipe(file_path)["text"] return transcription.strip() if transcription else "No transcription available." @tool(parse_docstring=True, error_on_invalid_docstring=False) def question_youtube_video(video_url: str, query: str) -> str: """ Answers a question about a YouTube video. The video is streamed and one frame is captured every x seconds, where x is declared in the environment settings. Captured frames are sent sequentially to a multimodal model to answer the question about the video. The final answer is aggregated from the answers to each frame. DOES NOT USE AUDIO! ONLY FRAMES FROM THE VIDEO ARE USED TO ANSWER THE QUESTION. Args: video_url (str): The URL of the video to capture frames from. query (str): The question to answer about the video. Returns: str: The answer to the question about the video. """ CAPTURE_INTERVAL_SEC = int(os.getenv("CAPTURE_INTERVAL_SEC", 2)) # Default to 2 seconds if not set # First, we need to get the video stream URL using yt-dlp ydl_opts = { "quiet": True, "skip_download": True, "format": "mp4[ext=mp4]+bestaudio/best", "forceurl": True, "noplaylist": True, "writesubtitles": True, "writeautomaticsub": True, "subtitlesformat": "vtt", "subtitleslangs": ['en'], } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info_dict = ydl.extract_info(video_url, download=False) assert isinstance(info_dict, dict), "Failed to extract video information. Please check the video URL." stream_url = info_dict.get("url", None) # Second, we use FFmpeg to capture frames from the video stream ffmpeg_cmd = [ "ffmpeg", "-i", stream_url, "-f", "matroska", # container format "-", ] process = subprocess.Popen( ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) container = av.open(process.stdout) stream = container.streams.video[0] time_base = stream.time_base if time_base is None: raise ValueError("Could not determine time base for the video stream. Please check the video URL and try again.") else: time_base = float(time_base) # Third, we need to use a multimodal model to analyze the video frames. if stream_url is None: raise ValueError("Could not retrieve video stream URL. Please check the video URL and try again.") else: image_model = ChatOpenAI( model="google/gemini-2.5-flash-preview-05-20", # Example multimodal model api_key=SecretStr(OPENROUTER_API_KEY), # Your OpenRouter API key base_url="https://openrouter.ai/api/v1", # Standard OpenRouter API base verbose=True # Optional: for debugging ) image_model_system_prompt = SystemMessage( content="You will be shown a frame from a video along with a question about that video and an answer based on the previous frames in the video. "\ "Your task is to analyze the frame and provide an answer to the question using both the current frame and the previous answer. " \ "If the previous answer is reasonable and the current frame can not answer the question return the previous answer. " \ "For example, if the question is about the color of a car and the previous answer is 'red' but the current frame shows no car, you should return 'red'. " \ "If the question is about the greatest number of something in the video, you should return the number counted in the current frame or the previous answer, whichever is greater. " \ "For example, if the current frame has 5 objects but the previous answer is 10 objects, you should return '10'. " \ "Be concise and clear in your answers, and do not repeat the question. " \ ) # Then, we loop through the frames and analyze them one by one, skipping frames based on the capture interval next_capture_time = 0 aggregated_answer = '' response = '' answers_list: List[dict] = [] for frame in container.decode(stream): if frame.pts is None: continue timestamp = float(frame.pts * time_base) if CAPTURE_INTERVAL_SEC is None or timestamp >= next_capture_time: # Convert the frame to an image format that the model can process buf = io.BytesIO() img = frame.to_image() img.save(buf, format="JPEG") # using PIL.Image.save jpeg_bytes = buf.getvalue() frame_base64 = base64.b64encode(jpeg_bytes).decode("utf-8") # Explicitly type the list to hold instances of BaseMessage msgs: List[BaseMessage] = [image_model_system_prompt] frame_query = query if aggregated_answer: frame_query += f"\nPrevious Answer: {aggregated_answer}" frame_query += "\nProvide a concise answer based on the previous answer and the current frame. " \ "If the current frame does not answer the question but there is a previous answer, return the previous answer. " \ "REMEMBER: This question is not about the current frame! It is about the video as a whole. ALWAYS PAY ATTENTION TO THE PREVIOUS ANSWER!" msgs.append(HumanMessage(content = [ { "type": "text", "text": frame_query }, { "type": "image", "source_type": "base64", "mime_type": "image/jpeg", "data": frame_base64 } ])) response = image_model.invoke(msgs) # Pass the image bytes to the model # Extract the answer from the model's response assert isinstance(response.content, str), "The model's response should be a string." answer = response.content.strip() answers_list.append({"timestamp": timestamp, "answer": answer}) if answer: aggregated_answer = answer if CAPTURE_INTERVAL_SEC is not None: next_capture_time += CAPTURE_INTERVAL_SEC process.terminate() final_answer_model = ChatOpenAI( model="google/gemini-2.5-flash-preview-05-20", # Example multimodal model api_key=SecretStr(OPENROUTER_API_KEY), # Your OpenRouter API key base_url="https://openrouter.ai/api/v1", # Standard OpenRouter API base verbose=True # Optional: for debugging ) final_answer_system_message = SystemMessage( "You are a brilliant assistant who is eager to help and extremely detailed oriented. " \ "A group of individuals have been asked the same question about a video. " \ "None of the individuals have seen the entire video. " \ "Each individual, when asked the question, was provided a frame from the video, as well as the previously reported answer based on the previous frame. " \ "Your job is to report a final answer for the question about the video. " \ "Ideally, the final answer has already been reported correctly by the last individual. " \ "However, this is similar to the game a telephone, where the true answer can become corrupted along the way. " \ "Assess all of the answers. If you can confirm the final answer is correct, simply return it. " \ "If you notice that the final answer is incorrect, then identify the correct answer and report that. " \ "You will also have access to the video title and description, which may help you identify the correct answer. " \ "Be concise and only respond with the correct final answer!" ) answers_list_str = "\n".join([f"Answer {i+1} at {ans['timestamp']:.2f}s: {ans['answer']}" for i, ans in enumerate(answers_list)]) final_query = ( f"Video Title: {info_dict.get('title', 'No title found')}. " f"Video Description: {info_dict.get('description', 'No description found')}. " f"Question about video: {query} " f"Answers provided by individuals: \n{answers_list_str}\n\n " "Provide a concise final answer to the question about the video based on the previous answers. " "Include a short explanation of why you chose this answer. " "Format the answer like so: " "Explanation: . " "Final Answer: . " ) final_msgs = [ final_answer_system_message, HumanMessage(content=[ { "type": "text", "text": final_query } ]) ] final_response = final_answer_model.invoke(final_msgs) assert isinstance(final_response.content, str), "The final model's response should be a string." final_answer = final_response.content.strip() return final_answer