import base64
import io
import json
import os
import subprocess
from email.message import Message
from io import StringIO
from pathlib import Path
from typing import List
import av
import pandas as pd
import requests
import yt_dlp
from bs4 import BeautifulSoup
from langchain_core.messages import BaseMessage, HumanMessage, SystemMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain_tavily import TavilyExtract, TavilySearch
from pydantic import SecretStr
TAVILY_API_KEY = os.getenv("TAVILY_API_KEY", "")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
YOUTUBE_FRAME_ASSESSMENT_MODEL = os.getenv("YOUTUBE_FRAME_ASSESSMENT_MODEL", "google/gemini-2.5-flash-preview-05-20")
YOUTUBE_CONFIRMATION_MODEL = os.getenv("YOUTUBE_CONFIRMATION_MODEL", "google/gemini-2.5-flash-preview-05-20")
# Define Tools for the Agent
@tool(parse_docstring=True, error_on_invalid_docstring=False)
def download_file_from_url(url: str, filename_override: str | None = None) -> str:
"""
Downloads a file from a URL to a directory in the cwd. Prefer to use the filename associated with the URL, but can override if directed to.
Filename Logic:
1. If `filename_override` is provided, it is used directly.
2. Otherwise, the filename is extracted from the 'Content-Disposition' HTTP header
using Python's `email.message.Message` parser. The result is sanitized.
3. If no filename is provided via override and none can be determined from
the header, a ValueError is raised.
Args:
url: The URL of the file to download.
filename_override: Optional. If provided, this exact name is used for the downloaded file. Using the name associated with the URL is recommended (but may require identifying the extension).
Returns:
The full path to the downloaded file.
Raises:
requests.exceptions.RequestException: For HTTP errors (e.g., 404, network issues).
IOError: If the file cannot be written.
ValueError: If no filename can be determined (neither provided via override
nor found in Content-Disposition header).
"""
try:
with requests.Session() as session:
with session.get(url, stream=True, allow_redirects=True, timeout=30) as response:
response.raise_for_status()
final_filename = None
if filename_override:
final_filename = filename_override
print(f"Using provided filename: {final_filename}")
else:
content_disposition = response.headers.get('content-disposition')
if content_disposition:
msg = Message()
msg['Content-Disposition'] = content_disposition
filename_from_header = msg.get_filename() # Handles various encodings
if filename_from_header:
# Sanitize by taking only the basename to prevent path traversal
final_filename = os.path.basename(filename_from_header)
print(f"Using filename from Content-Disposition: {final_filename}")
if not final_filename:
raise ValueError(
"No filename could be determined. "
"None was provided as an override, and it could not be "
"extracted from the Content-Disposition header."
)
current_dir = Path.cwd()
temp_dir = current_dir / "temp_downloads"
temp_dir.mkdir(parents=True, exist_ok=True)
local_filepath = os.path.join(temp_dir, final_filename)
with open(local_filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
#print(f"File downloaded to: {local_filepath}")
return_str = f"File downloaded successfully. Local File Path: {local_filepath}"
return return_str
except requests.exceptions.RequestException as e:
print(f"Error during download from {url}: {e}")
raise
except IOError as e:
print(f"Error writing file: {e}")
raise
# ValueError will propagate if raised
@tool(parse_docstring=True, error_on_invalid_docstring=False)
def basic_web_search(query: str, search_domains: list[str] | None = None) -> str:
"""
Perform a web search using Tavily. Useful for retrieving relevant URLs and content summaries based on a search query.
The content returned by this tool is limited. For more detailed content extraction, use the `extract_url_content` tool.
If you would like to limit the search to specific domains, you can pass a comma-separated string of domains (['wikipedia.org', 'example.com']).
Args:
query (str): The search query to perform.
search_domains (None | list[str]): Optional. A list of domains (E.g., ['wikipedia.org', 'example.com']) to restrict the search to. If None, searches across all domains.
Returns:
str: a json formatted string containing the search results, including titles, content snippets, and URLs.
"""
search_tool = TavilySearch(
api_key=SecretStr(TAVILY_API_KEY),
max_results=5,
include_raw_content=False,
#include_answer=True,
include_domains=search_domains
)
results = search_tool.invoke({"query": query})
if results and isinstance(results, dict) and len(results["results"]) > 0:
return_dict = {
#"answer": "The following is an unconfirmed answer. Confirm it by extracting cotent from a url." + results.get("answer", ""),
"results": []
}
for result in results["results"]:
if "title" in result and "content" in result and result['score'] > 0.25: # Filter results based on score
return_dict["results"].append({
"title": result["title"],
"url": result["url"],
"content": result["content"],
})
if len(return_dict["results"]) == 0:
return "No results found. If the query is too specific, try a more general search term."
return json.dumps(return_dict, indent=2)
else:
return "No results found. If the query is too specific, try a more general search term."
@tool(parse_docstring=True, error_on_invalid_docstring=False)
def extract_url_content(url_list: list[str]) -> str:
"""
Extracts the content from URLs using Tavily's extract tool.
This tool is useful for retrieving content from web pages.
This tool will most likely be used after a web search to extract content from the URLs returned by the search.
Args:
url_list (list[str]): The URLs to extract content from.
Returns:
str: The extracted content or an error message if extraction fails.
"""
extract_tool = TavilyExtract(api_key=SecretStr(TAVILY_API_KEY))
extract_results = extract_tool.invoke({'urls': url_list})
if extract_results and 'results' in extract_results and len(extract_results['results']) > 0:
for i, page_content in enumerate(extract_results['results']):
del extract_results['results'][i]['images']
# if len(page_content['raw_content']) > 40000:
# extract_results['results'][i]['raw_content'] = page_content['raw_content'][:40000] + '... [truncated]'
return json.dumps(extract_results['results'], indent=2)
else:
return f"No content could be extracted from the provided URLs: {url_list}"
def bs_html_parser(url):
response = requests.get(url) # Send a GET request to the URL
# Check if the request was successful
if response.status_code == 200:
return BeautifulSoup(response.text, "html.parser") # Parse and return the HTML
else:
return None # Return None if the request fails
def get_table_title(table_tag):
"""
Extracts a title for a given table tag.
It looks for a
, then for the closest preceding - tag.
"""
title = "Untitled Table"
# 1. Check for a element within the table
caption = table_tag.find('caption')
if caption:
caption_text = caption.get_text(strip=True)
if caption_text: # Ensure caption is not empty and use it
return caption_text
# 2. If no caption, look for the closest preceding heading tag (h1-h6)
headings = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']
# find_all_previous gets all previous tags matching criteria, in reverse document order.
# limit=1 gets the closest one (the last one encountered before the table).
preceding_headings = table_tag.find_all_previous(headings, limit=1)
if preceding_headings:
heading_tag = preceding_headings[0]
# To get the cleanest text, prefer 'mw-headline' if it exists,
# otherwise, clone the heading, remove edit sections, and then get text.
# Try to find a specific 'mw-headline' span first (common in Wikipedia)
headline_span = heading_tag.find("span", class_="mw-headline")
if headline_span:
title_text = headline_span.get_text(strip=True)
else:
# Fallback: create a temporary copy of the heading tag to modify it
# without affecting the main soup.
temp_heading_soup = BeautifulSoup(str(heading_tag), 'html.parser')
temp_heading_tag = temp_heading_soup.find(heading_tag.name)
if temp_heading_tag:
# Remove "edit" links (span with class "mw-editsection")
for span in temp_heading_tag.find_all("span", class_="mw-editsection"):
span.decompose()
title_text = temp_heading_tag.get_text(strip=True)
else:
# If cloning somehow failed, take raw text (less ideal)
title_text = heading_tag.get_text(strip=True)
if title_text: # Ensure title_text is not empty
title = title_text
return title
@tool(parse_docstring=True, error_on_invalid_docstring=False)
def wikipedia_reader(url: str) -> str:
"""
Extracts sections, paragraphs, and tables from a Wikipedia page.
Args:
url (str): The URL of the Wikipedia page to extract content from.
Returns:
str: A JSON string containing sections, paragraphs, and tables.
"""
soup = bs_html_parser(url)
if not soup:
return "" # Return empty if soup creation failed
def extract_links(soup_obj):
links = []
for link in soup_obj.find_all('a', href=True):
href = link.get('href')
# Filter for internal page links (sections)
if href and href.startswith("#") and "#cite_" not in href and len(href) > 1:
links.append(url+href)
# Original logic for other links starting with the base URL (might need adjustment based on desired links)
# elif href and href.startswith(url):
# links.append(href)
return links
links = extract_links(soup)
def extract_paragraphs(soup_obj):
paragraphs_text = [p.get_text(strip=True) for p in soup_obj.find_all("p")]
return [p for p in paragraphs_text if p and len(p) > 10]
paragraphs = extract_paragraphs(soup)
def extract_tables(soup_obj):
tables_with_titles = []
for table_tag in soup_obj.find_all("table", {"class": "wikitable"}):
title = get_table_title(table_tag) # Get the title
try:
# Pandas read_html expects a string or file-like object
table_html_str = str(table_tag)
# Using StringIO to simulate a file, as read_html can be sensitive
df_list = pd.read_html(StringIO(table_html_str))
if df_list:
df = df_list[0] # read_html returns a list of DataFrames
tables_with_titles.append({"title": title, "table_data": df.to_dict(orient='records')})
else:
tables_with_titles.append({"title": title, "table_data": None, "error": "pd.read_html returned empty list"})
except Exception as e:
tables_with_titles.append({"title": title, "table_data" : None, "error": str(e)})
return tables_with_titles
tables = extract_tables(soup) # This now returns a list of dicts
return_dict = {
"sections": links,
"paragraphs": paragraphs,
"tables": tables
}
return json.dumps(return_dict, indent=2, ensure_ascii=False) # Return as JSON string
# Singleton class for Whisper model
# we use this so we don't have to load the model multiple times, just once the first time the tool is used
class WhisperTranscriber:
_instance = None
def __new__(cls):
if cls._instance is None:
import torch
from transformers import AutoModelForSpeechSeq2Seq, AutoProcessor
from transformers.pipelines import pipeline
device = "cuda:0" if torch.cuda.is_available() else "cpu"
torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32
model_id = "openai/whisper-large-v3"
model = AutoModelForSpeechSeq2Seq.from_pretrained(
model_id, torch_dtype=torch_dtype, low_cpu_mem_usage=True, use_safetensors=True
)
model.to(device)
processor = AutoProcessor.from_pretrained(model_id)
pipe = pipeline(
"automatic-speech-recognition",
model=model,
tokenizer=processor.tokenizer,
feature_extractor=processor.feature_extractor,
torch_dtype=torch_dtype,
device=device,
)
cls._instance = pipe
return cls._instance
@tool(parse_docstring=True, error_on_invalid_docstring=False)
def transcribe_audio_file(file_path: str) -> str:
"""
Transcribes an audio file to text using OpenAI's Whisper-large-v3 model, caching the model after the first load.
Args:
file_path (str): The path to the audio file to transcribe.
Returns:
str: The transcription of the audio file.
"""
pipe = WhisperTranscriber()
transcription = pipe(file_path)["text"]
return transcription.strip() if transcription else "No transcription available."
@tool(parse_docstring=True, error_on_invalid_docstring=False)
def question_youtube_video(video_url: str, query: str) -> str:
"""
Answers a question about a YouTube video.
The video is streamed and one frame is captured every x seconds, where x is declared in the environment settings.
Captured frames are sent sequentially to a multimodal model to answer the question about the video.
The final answer is aggregated from the answers to each frame.
DOES NOT USE AUDIO! ONLY FRAMES FROM THE VIDEO ARE USED TO ANSWER THE QUESTION.
Args:
video_url (str): The URL of the video to capture frames from.
query (str): The question to answer about the video.
Returns:
str: The answer to the question about the video.
"""
CAPTURE_INTERVAL_SEC = int(os.getenv("CAPTURE_INTERVAL_SEC", 2)) # Default to 2 seconds if not set
# First, we need to get the video stream URL using yt-dlp
ydl_opts = {
"quiet": True,
"skip_download": True,
"format": "mp4[ext=mp4]+bestaudio/best",
"forceurl": True,
"noplaylist": True,
"writesubtitles": True,
"writeautomaticsub": True,
"subtitlesformat": "vtt",
"subtitleslangs": ['en'],
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(video_url, download=False)
assert isinstance(info_dict, dict), "Failed to extract video information. Please check the video URL."
stream_url = info_dict.get("url", None)
# Second, we use FFmpeg to capture frames from the video stream
ffmpeg_cmd = [
"ffmpeg",
"-i",
stream_url,
"-f",
"matroska", # container format
"-",
]
process = subprocess.Popen(
ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
container = av.open(process.stdout)
stream = container.streams.video[0]
time_base = stream.time_base
if time_base is None:
raise ValueError("Could not determine time base for the video stream. Please check the video URL and try again.")
else:
time_base = float(time_base)
# Third, we need to use a multimodal model to analyze the video frames.
if stream_url is None:
raise ValueError("Could not retrieve video stream URL. Please check the video URL and try again.")
else:
image_model = ChatOpenAI(
model="google/gemini-2.5-flash-preview-05-20", # Example multimodal model
api_key=SecretStr(OPENROUTER_API_KEY), # Your OpenRouter API key
base_url="https://openrouter.ai/api/v1", # Standard OpenRouter API base
verbose=True # Optional: for debugging
)
image_model_system_prompt = SystemMessage(
content="You will be shown a frame from a video along with a question about that video and an answer based on the previous frames in the video. "\
"Your task is to analyze the frame and provide an answer to the question using both the current frame and the previous answer. " \
"If the previous answer is reasonable and the current frame can not answer the question return the previous answer. " \
"For example, if the question is about the color of a car and the previous answer is 'red' but the current frame shows no car, you should return 'red'. " \
"If the question is about the greatest number of something in the video, you should return the number counted in the current frame or the previous answer, whichever is greater. " \
"For example, if the current frame has 5 objects but the previous answer is 10 objects, you should return '10'. " \
"Be concise and clear in your answers, and do not repeat the question. " \
)
# Then, we loop through the frames and analyze them one by one, skipping frames based on the capture interval
next_capture_time = 0
aggregated_answer = ''
response = ''
answers_list: List[dict] = []
for frame in container.decode(stream):
if frame.pts is None:
continue
timestamp = float(frame.pts * time_base)
if CAPTURE_INTERVAL_SEC is None or timestamp >= next_capture_time:
# Convert the frame to an image format that the model can process
buf = io.BytesIO()
img = frame.to_image()
img.save(buf, format="JPEG") # using PIL.Image.save
jpeg_bytes = buf.getvalue()
frame_base64 = base64.b64encode(jpeg_bytes).decode("utf-8")
# Explicitly type the list to hold instances of BaseMessage
msgs: List[BaseMessage] = [image_model_system_prompt]
frame_query = query
if aggregated_answer:
frame_query += f"\nPrevious Answer: {aggregated_answer}"
frame_query += "\nProvide a concise answer based on the previous answer and the current frame. " \
"If the current frame does not answer the question but there is a previous answer, return the previous answer. " \
"REMEMBER: This question is not about the current frame! It is about the video as a whole. ALWAYS PAY ATTENTION TO THE PREVIOUS ANSWER!"
msgs.append(HumanMessage(content = [
{
"type": "text",
"text": frame_query
},
{
"type": "image",
"source_type": "base64",
"mime_type": "image/jpeg",
"data": frame_base64
}
]))
response = image_model.invoke(msgs) # Pass the image bytes to the model
# Extract the answer from the model's response
assert isinstance(response.content, str), "The model's response should be a string."
answer = response.content.strip()
answers_list.append({"timestamp": timestamp, "answer": answer})
if answer:
aggregated_answer = answer
if CAPTURE_INTERVAL_SEC is not None:
next_capture_time += CAPTURE_INTERVAL_SEC
process.terminate()
final_answer_model = ChatOpenAI(
model="google/gemini-2.5-flash-preview-05-20", # Example multimodal model
api_key=SecretStr(OPENROUTER_API_KEY), # Your OpenRouter API key
base_url="https://openrouter.ai/api/v1", # Standard OpenRouter API base
verbose=True # Optional: for debugging
)
final_answer_system_message = SystemMessage(
"You are a brilliant assistant who is eager to help and extremely detailed oriented. " \
"A group of individuals have been asked the same question about a video. " \
"None of the individuals have seen the entire video. " \
"Each individual, when asked the question, was provided a frame from the video, as well as the previously reported answer based on the previous frame. " \
"Your job is to report a final answer for the question about the video. " \
"Ideally, the final answer has already been reported correctly by the last individual. " \
"However, this is similar to the game a telephone, where the true answer can become corrupted along the way. " \
"Assess all of the answers. If you can confirm the final answer is correct, simply return it. " \
"If you notice that the final answer is incorrect, then identify the correct answer and report that. " \
"You will also have access to the video title and description, which may help you identify the correct answer. " \
"Be concise and only respond with the correct final answer!"
)
answers_list_str = "\n".join([f"Answer {i+1} at {ans['timestamp']:.2f}s: {ans['answer']}" for i, ans in enumerate(answers_list)])
final_query = (
f"Video Title: {info_dict.get('title', 'No title found')}. "
f"Video Description: {info_dict.get('description', 'No description found')}. "
f"Question about video: {query} "
f"Answers provided by individuals: \n{answers_list_str}\n\n "
"Provide a concise final answer to the question about the video based on the previous answers. "
"Include a short explanation of why you chose this answer. "
"Format the answer like so: "
"Explanation: . "
"Final Answer: . "
)
final_msgs = [
final_answer_system_message,
HumanMessage(content=[
{
"type": "text",
"text": final_query
}
])
]
final_response = final_answer_model.invoke(final_msgs)
assert isinstance(final_response.content, str), "The final model's response should be a string."
final_answer = final_response.content.strip()
return final_answer